Source code for nest.tests.test_connect_helpers

# -*- coding: utf-8 -*-
#
# test_connect_helpers.py
#
# This file is part of NEST.
#
# Copyright (C) 2004 The NEST Initiative
#
# NEST is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# NEST is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with NEST.  If not, see <http://www.gnu.org/licenses/>.

import numpy as np
import scipy.stats
import nest
from scipy.stats import truncexpon

try:
    from mpi4py import MPI
    haveMPI4Py = True
except:
    haveMPI4Py = False


[docs]def gather_data(data_array): ''' Gathers data from all mpi processes by collecting all element in a list if data is a list and summing all elements to one numpy-array if data is one numpy-array. Returns gathered data if rank of current mpi node is zero and None otherwise. ''' if haveMPI4Py: data_array_list = MPI.COMM_WORLD.gather(data_array, root=0) if MPI.COMM_WORLD.Get_rank() == 0: if isinstance(data_array, list): gathered_data = [ item for sublist in data_array_list for item in sublist] else: gathered_data = sum(data_array_list) return gathered_data else: return None else: return data_array
[docs]def is_array(data): ''' Returns True if data is a list or numpy-array and False otherwise. ''' return isinstance(data, (list, np.ndarray, np.generic))
[docs]def mpi_barrier(): if haveMPI4Py: MPI.COMM_WORLD.Barrier()
[docs]def mpi_assert(data_original, data_test, TestCase): ''' Compares data_original and data_test using assertTrue from the TestCase. ''' data_original = gather_data(data_original) # only test if on rank 0 if data_original is not None: if isinstance(data_original, (np.ndarray, np.generic)) \ and isinstance(data_test, (np.ndarray, np.generic)): TestCase.assertTrue(np.allclose(data_original, data_test)) else: TestCase.assertTrue(data_original == data_test)
[docs]def all_equal(x): ''' Tests if all elements in a list are equal. Returns True or False ''' return x.count(x[0]) == len(x)
[docs]def get_connectivity_matrix(pop1, pop2): ''' Returns a connectivity matrix describing all connections from pop1 to pop2 such that M_ij describes the connection between the jth neuron in pop1 to the ith neuron in pop2. ''' M = np.zeros((len(pop2), len(pop1))) connections = nest.GetConnections(pop1, pop2) index_dic = {} pop1 = np.asarray(pop1) pop2 = np.asarray(pop2) for node in pop1: index_dic[node] = np.where(pop1 == node)[0][0] for node in pop2: index_dic[node] = np.where(pop2 == node)[0][0] for conn in connections: source_id = conn[0] target_id = conn[1] M[index_dic[target_id]][index_dic[source_id]] += 1 return M
[docs]def get_weighted_connectivity_matrix(pop1, pop2, label): ''' Returns a weighted connectivity matrix describing all connections from pop1 to pop2 such that M_ij describes the connection between the jth neuron in pop1 to the ith neuron in pop2. Only works without multapses. ''' M = np.zeros((len(pop2), len(pop1))) connections = nest.GetConnections(pop1, pop2) index_dic = {} pop1 = np.asarray(pop1) pop2 = np.asarray(pop2) for node in pop1: index_dic[node] = np.where(pop1 == node)[0][0] for node in pop2: index_dic[node] = np.where(pop2 == node)[0][0] for conn in connections: source_id = conn[0] target_id = conn[1] weight = nest.GetStatus(nest.GetConnections( [source_id], [target_id]))[0][label] M[index_dic[target_id]][index_dic[source_id]] += weight return M
[docs]def check_synapse(params, values, syn_params, TestCase): for i, param in enumerate(params): syn_params[param] = values[i] TestCase.setUpNetwork(TestCase.conn_dict, syn_params) for i, param in enumerate(params): conns = nest.GetStatus(nest.GetConnections( TestCase.pop1, TestCase.pop2)) conn_params = [conn[param] for conn in conns] TestCase.assertTrue(all_equal(conn_params)) TestCase.assertTrue(conn_params[0] == values[i])
# copied from Masterthesis, Daniel Hjertholm
[docs]def counter(x, fan, source_pop, target_pop): ''' Count similar elements in list. Parameters ---------- x: Any list. Return values ------------- list containing counts of similar elements. ''' N_p = len(source_pop) if fan == 'in' else len(target_pop) # of pool nodes. start = min(x) counts = [0] * N_p for elem in x: counts[elem - start] += 1 return counts
[docs]def get_degrees(fan, pop1, pop2): M = get_connectivity_matrix(pop1, pop2) if fan == 'in': degrees = np.sum(M, axis=1) elif fan == 'out': degrees = np.sum(M, axis=0) return degrees
# adapted from Masterthesis, Daniel Hjertholm
[docs]def get_expected_degrees_fixedDegrees(N, fan, len_source_pop, len_target_pop): N_d = len_target_pop if fan == 'in' else len_source_pop # of driver nodes. N_p = len_source_pop if fan == 'in' else len_target_pop # of pool nodes. expected_degree = N_d * N / float(N_p) expected = [expected_degree] * N_p return expected
# adapted from Masterthesis, Daniel Hjertholm
[docs]def get_expected_degrees_totalNumber(N, fan, len_source_pop, len_target_pop): expected_indegree = [N / float(len_target_pop)] * len_target_pop expected_outdegree = [N / float(len_source_pop)] * len_source_pop if fan == 'in': return expected_indegree elif fan == 'out': return expected_outdegree
# copied from Masterthesis, Daniel Hjertholm
[docs]def get_expected_degrees_bernoulli(p, fan, len_source_pop, len_target_pop): ''' Calculate expected degree distribution. Degrees with expected number of observations below e_min are combined into larger bins. Return values ------------- 2D array. The four columns contain degree, expected number of observation, actual number observations, and the number of bins combined. ''' n = len_source_pop if fan == 'in' else len_target_pop n_p = len_target_pop if fan == 'in' else len_source_pop mid = int(round(n * p)) e_min = 5 # Combine from front. data_front = [] cumexp = 0.0 bins_combined = 0 for degree in range(mid): cumexp += scipy.stats.binom.pmf(degree, n, p) * n_p bins_combined += 1 if cumexp < e_min: if degree == mid - 1: if len(data_front) == 0: raise RuntimeWarning('Not enough data') deg, exp, obs, num = data_front[-1] data_front[-1] = (deg, exp + cumexp, obs, num + bins_combined) else: continue else: data_front.append((degree - bins_combined + 1, cumexp, 0, bins_combined)) cumexp = 0.0 bins_combined = 0 # Combine from back. data_back = [] cumexp = 0.0 bins_combined = 0 for degree in reversed(range(mid, n + 1)): cumexp += scipy.stats.binom.pmf(degree, n, p) * n_p bins_combined += 1 if cumexp < e_min: if degree == mid: if len(data_back) == 0: raise RuntimeWarning('Not enough data') deg, exp, obs, num = data_back[-1] data_back[-1] = (degree, exp + cumexp, obs, num + bins_combined) else: continue else: data_back.append((degree, cumexp, 0, bins_combined)) cumexp = 0.0 bins_combined = 0 data_back.reverse() expected = np.array(data_front + data_back) if fan == 'out': assert (sum(expected[:, 3]) == len_target_pop + 1) else: # , 'Something is wrong' assert (sum(expected[:, 3]) == len_source_pop + 1) # np.hstack((np.asarray(data_front)[0], np.asarray(data_back)[0])) return expected
# adapted from Masterthesis, Daniel Hjertholm
[docs]def reset_seed(seed, nr_threads): ''' Reset the simulator and seed the PRNGs. Parameters ---------- seed: PRNG seed value. ''' nest.ResetKernel() nest.SetKernelStatus({'local_num_threads': nr_threads}) nr_procs = nest.GetStatus([0])[0]['total_num_virtual_procs'] seeds = [((nr_procs + 1) * seed + k) for k in range(nr_procs)] nest.SetKernelStatus({'rng_seeds': seeds}) nest.SetKernelStatus({'grng_seed': nr_procs * (seed + 1) + seed})
# copied from Masterthesis, Daniel Hjertholm
[docs]def chi_squared_check(degrees, expected, distribution=None): ''' Create a single network and compare the resulting degree distribution with the expected distribution using Pearson's chi-squared GOF test. Parameters ---------- seed : PRNG seed value. control: Boolean value. If True, _generate_multinomial_degrees will be used instead of _get_degrees. Return values ------------- chi-squared statistic. p-value from chi-squared test. ''' if distribution in ('pairwise_bernoulli', 'symmetric_pairwise_bernoulli'): observed = {} for degree in degrees: if degree not in observed: observed[degree] = 1 else: observed[degree] += 1 # Add observations to data structure, combining multiple observations # where necessary. expected[:, 2] = 0.0 for row in expected: for i in range(int(row[3])): deg = int(row[0]) + i if deg in observed: row[2] += observed[deg] # ddof: adjustment to the degrees of freedom. df = k-1-ddof return scipy.stats.chisquare(np.array(expected[:, 2]), np.array(expected[:, 1])) else: # ddof: adjustment to the degrees of freedom. df = k-1-ddof return scipy.stats.chisquare(np.array(degrees), np.array(expected))
# copied from Masterthesis, Daniel Hjertholm
[docs]def two_level_check(n_runs, degrees, expected, verbose=True): ''' Create a network and run chi-squared GOF test n_runs times. Test whether resulting p-values are uniformly distributed on [0, 1] using the Kolmogorov-Smirnov GOF test. Parameters ---------- n_runs : Number of times to repeat chi-squared test. start_seed: First PRNG seed value. control : Boolean value. If True, _generate_multinomial_degrees will be used instead of _get_degrees. verbose : Boolean value, determining whether to print progress. Return values ------------- KS statistic. p-value from KS test. ''' pvalues = [] for i in range(n_runs): if verbose: print("Running test %d of %d." % (i + 1, n_runs)) chi, p = chi_squared_check(degrees, expected) pvalues.append(p) ks, p = scipy.stats.kstest(pvalues, 'uniform') return ks, p
# copied from Masterthesis, Daniel Hjertholm
[docs]def adaptive_check(stat_dict, degrees, expected): ''' Create a single network using fixed in/outdegrees and run a chi-squared GOF test on the connection distribution. If the result is extreme (high or low), run a two-level test. Parameters ---------- test : Instance of RCC_tester or RDC_tester class. n_runs: If chi-square test fails, test is repeated n_runs times, and the KS test is used to analyze results. Return values ------------- boolean value. True if test was passed, False otherwise. ''' chi, p = chi_squared_check(degrees, expected) if stat_dict['alpha1_low'] < p < stat_dict['alpha1_up']: return True else: ks, p = two_level_check(stat_dict['n_runs'], degrees, expected) return True if p > stat_dict['alpha2'] else False
[docs]def get_clipped_cdf(params): def clipped_cdf(x): if params['distribution'] == 'lognormal_clipped': cdf_low = scipy.stats.lognorm.cdf( params['low'], params['sigma'], 0, np.exp(params['mu'])) cdf_x = scipy.stats.lognorm.cdf( x, params['sigma'], 0, np.exp(params['mu'])) cdf_high = scipy.stats.lognorm.cdf( params['high'], params['sigma'], 0, np.exp(params['mu'])) elif params['distribution'] == 'exponential_clipped': cdf_low = scipy.stats.expon.cdf( params['low'], 0, 1. / params['lambda']) cdf_x = scipy.stats.expon.cdf(x, 0, 1. / params['lambda']) cdf_high = scipy.stats.expon.cdf( params['high'], 0, 1. / params['lambda']) elif params['distribution'] == 'gamma_clipped': cdf_low = scipy.stats.gamma.cdf( params['low'], params['order'], 0, params['scale']) cdf_x = scipy.stats.gamma.cdf( x, params['order'], 0, params['scale']) cdf_high = scipy.stats.gamma.cdf( params['high'], params['order'], 0, params['scale']) else: raise ValueError("Clipped {} distribution not supported.".format( params['distribution'])) cdf = (cdf_x - cdf_low) / (cdf_high - cdf_low) cdf[cdf < 0] = 0 cdf[cdf > 1] = 1 return cdf return clipped_cdf
[docs]def get_expected_freqs(params, x, N): if params['distribution'] == 'uniform_int': expected = scipy.stats.randint.pmf( x, params['low'], params['high'] + 1) * N elif (params['distribution'] == 'binomial' or params['distribution'] == 'binomial_clipped'): expected = scipy.stats.binom.pmf(x, params['n'], params['p']) * N elif params['distribution'] == 'poisson': expected = scipy.stats.poisson.pmf(x, params['lambda']) * N elif params['distribution'] == 'poisson_clipped': x = np.arange( scipy.stats.poisson.ppf(1e-8, params['lambda']), scipy.stats.poisson.ppf(0.9999, params['lambda']) ) expected = scipy.stats.poisson.pmf(x, params['lambda']) expected = expected[np.where(x <= params['high'])] x = x[np.where(x <= params['high'])] expected = expected[np.where(x >= params['low'])] expected = expected / np.sum(expected) * N return expected
[docs]def check_ks(pop1, pop2, label, alpha, params): clipped_dists = ['exponential_clipped', 'gamma_clipped', 'lognormal_clipped'] discrete_dists = ['binomial', 'poisson', 'uniform_int', 'binomial_clipped', 'poisson_clipped'] M = get_weighted_connectivity_matrix(pop1, pop2, label) M = M.flatten() if params['distribution'] in discrete_dists: frequencies = scipy.stats.itemfreq(M) expected = get_expected_freqs(params, frequencies[:, 0], len(M)) chi, p = scipy.stats.chisquare(frequencies[:, 1], expected) elif params['distribution'] in clipped_dists: D, p = scipy.stats.kstest(M, get_clipped_cdf( params), alternative='two-sided') else: if params['distribution'] == 'normal': distrib = scipy.stats.norm args = params['mu'], params['sigma'] elif params['distribution'] == 'normal_clipped': distrib = scipy.stats.truncnorm args = ( (params['low'] - params['mu']) / params['sigma'] if 'low' in params else -np.inf, (params['high'] - params['mu']) / params['sigma'] if 'high' in params else -np.inf, params['mu'], params['sigma'] ) elif params['distribution'] == 'exponential': distrib = scipy.stats.expon args = 0, 1. / params['lambda'] elif params['distribution'] == 'gamma': distrib = scipy.stats.gamma args = params['order'], 0, params['scale'] elif params['distribution'] == 'lognormal': distrib = scipy.stats.lognorm args = params['sigma'], 0, np.exp(params['mu']) elif params['distribution'] == 'uniform': distrib = scipy.stats.uniform args = params['low'], params['high'] - params['low'] else: raise ValueError("{} distribution not supported.".format( params['distribution'])) D, p = scipy.stats.kstest( M, distrib.cdf, args=args, alternative='two-sided') return p > alpha