diff --git a/CHANGES.txt b/CHANGES.txt index ef522a8..55091b6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,2 +1,3 @@ v0.1.0: Initial release -v0.2.0: Spike Time Dependent Plasticity included. Heterogenous neuron API. Bug fixes to synaptic step functions. \ No newline at end of file +v0.2.0: Spike Time Dependent Plasticity included. Heterogenous neuron API. Bug fixes to synaptic step functions. +v0.3.0: Model can be saved and loaded. Simulation can be continued with multiple calls to model.NeurmorphicModel.simulate. model.NeuromorphicModel.setup provides the option to reset the simulation temporally, while preserving weights. API functions renamed: model.NeuromorhpicModel.get_spikes -> model.NeuromorphicModel.get_spike_times and model.NeuromorphicModel.spike -> model.NeuromorphicModel.add_spike. Bug fixes to synaptic delay, compress_tensor and shared memory use in CPU mode. Customizable learning parameters. \ No newline at end of file diff --git a/setup.py b/setup.py index 811c3ac..899cbc6 100755 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="SuperNeuroABM", - version="0.2.0", + version="0.3.0", author="Chathika Gunaratne, Prasanna Date, Shruti Kulkarni, Xi Zhang", author_email="gunaratnecs@ornl.gov", packages=["superneuroabm", "superneuroabm.core"], @@ -13,5 +13,5 @@ long_description="""A GPU-based multi-agent simulation framework for neuromorphic computing.""", long_description_content_type="text/markdown", project_urls={"Source": "https://github.com/ORNL/superneuroabm"}, - install_requires=["numba==0.55.1", "numpy==1.21.6", "tqdm==4.64.1"], + install_requires=["numba==0.55.1", "numpy==1.21.6", "tqdm==4.64.1"], ) diff --git a/superneuroabm/core/agent.py b/superneuroabm/core/agent.py index b57dd4a..f094928 100755 --- a/superneuroabm/core/agent.py +++ b/superneuroabm/core/agent.py @@ -3,7 +3,6 @@ from collections import OrderedDict from copy import copy import warnings -from multiprocessing import shared_memory import numpy as np from numba import cuda @@ -172,7 +171,7 @@ def set_agent_property_value( self._property_name_2_agent_data_tensor[property_name][ agent_id ] = value - if dims: + if dims != None: if self._property_name_2_max_dims[property_name]: if len(dims) != len( self._property_name_2_max_dims[property_name] @@ -230,16 +229,20 @@ def generate_agent_data_tensors( adt = convert_to_equal_side_tensor(adt, max_dims) if use_cuda: adt = cuda.to_device(adt) - else: - # use shared memory if not cuda - d_size = np.dtype(dtype).itemsize * np.prod(adt.shape) - shm = shared_memory.SharedMemory( - create=True, - size=d_size, - name=f"npshared{property_name}", - ) - dst = np.ndarray(shape=adt.shape, dtype=dtype, buffer=shm.buf) - dst[:] = adt[:] + '''else: + # TODO fix shared memory usage + if False: + d_size = np.dtype(dtype).itemsize * np.prod(adt.shape) + shm = shared_memory.SharedMemory( + create=True, + size=d_size, + name=f"npshared{property_name}", + ) + dst = np.ndarray( + shape=adt.shape, dtype=dtype, buffer=shm.buf + ) + dst[:] = adt[:] + ''' converted_agent_data_tensors.append(adt) return converted_agent_data_tensors @@ -252,12 +255,15 @@ def update_agents_properties( dt = equal_side_agent_data_tensors[i] else: dt = equal_side_agent_data_tensors[i] - # Free shared memory - shm = shared_memory.SharedMemory( - name=f"npshared{property_name}" - ) - shm.close() - shm.unlink() + # TODO Free shared memory + '''if False: + if dt.size != 0: + shm = shared_memory.SharedMemory( + name=f"npshared{property_name}" + ) + shm.close() + shm.unlink() + ''' self._property_name_2_agent_data_tensor[ property_names[i] ] = compress_tensor(dt) diff --git a/superneuroabm/core/model.py b/superneuroabm/core/model.py index 1fea71c..b3d020b 100755 --- a/superneuroabm/core/model.py +++ b/superneuroabm/core/model.py @@ -7,9 +7,9 @@ import heapq from multiprocessing import Pool, Manager import inspect +import pickle from numba import cuda -from numba.cuda.random import create_xoroshiro128p_states from tqdm import tqdm from superneuroabm.core.agent import AgentFactory, Breed @@ -18,19 +18,8 @@ class Model: THREADSPERBLOCK = 32 - def __init__(self, name: str = "Untitled", use_cuda: bool = True) -> None: - self._name = name + def __init__(self) -> None: self._agent_factory = AgentFactory() - self._use_cuda = use_cuda - if self._use_cuda: - if not cuda.is_available(): - raise EnvironmentError( - "CUDA requested but no cuda installation detected." - ) - else: - pass - # device = cuda.get_current_device() - # device.reset() def register_breed(self, breed: Breed) -> None: if self._agent_factory.num_agents > 0: @@ -83,7 +72,20 @@ def set_agent_property_value( def get_agents_with(self, query: Callable) -> Set[List[Any]]: return self._agent_factory.get_agents_with(query=query) - def setup(self) -> None: + def setup(self, use_cuda: bool = True) -> None: + """ + Must be called before first simulate call. + Initializes model and resets ticks. Readies step functions + and for breeds. + + :param use_cuda: runs model in GPU mode. + """ + self._use_cuda = use_cuda + if self._use_cuda: + if not cuda.is_available(): + raise EnvironmentError( + "CUDA requested but no cuda installation detected." + ) # Create record of agent step functions by breed and priority self._breed_idx_2_step_func_by_priority: List[Dict[int, Callable]] = [] heap_priority_breedidx_func = [] @@ -110,16 +112,16 @@ def setup(self) -> None: ) last_priority = priority - # Generate agent data tensor - self._agent_data_tensors = ( - self._agent_factory.generate_agent_data_tensors(self._use_cuda) - ) + # Generate global data tensor + self._global_data_vector = [0] # index 0 reserved for step def simulate( self, ticks: int, update_data_ticks: int = 1, num_cpu_proc: int = 4 ) -> None: - # Generate global data tensor - self._global_data_vector = [0] # index 0 reserved for step + # Generate agent data tensor + self._agent_data_tensors = ( + self._agent_factory.generate_agent_data_tensors(self._use_cuda) + ) if not self._use_cuda: with Pool(num_cpu_proc) as pool: with Manager() as manager: @@ -155,9 +157,10 @@ def simulate( unit_divisor=1000, dynamic_ncols=True, ): - shared_global_data_vector[0] = tick for jobs_in_priority in jobs: _ = list(map(smap, jobs_in_priority)) + shared_global_data_vector[0] += 1 + self._global_data_vector = list(shared_global_data_vector) else: blockspergrid = int( math.ceil( @@ -169,20 +172,39 @@ def simulate( self._breed_idx_2_step_func_by_priority, ticks, ) - ################ - shared_global_data_vector = [0] device_global_data_vector = cuda.to_device( - shared_global_data_vector + self._global_data_vector ) exec(step_funcs_code_obj) + # Globals will have to be moved explicitly + self._global_data_vector = device_global_data_vector.copy_to_host() self._agent_factory.update_agents_properties( self._agent_data_tensors, self._use_cuda - ) + ) # internals of this will automatically move data back from GPU - @property - def name(self) -> str: - return self._name + def save(self, app: "Model", fpath: str) -> None: + """ + Saves model. Must be overridden if additional data + pertaining to application must be saved. + + :param fpath: file path to save pickle file at + :param app_data: additional application data to be saved. + """ + if "_agent_data_tensors" in app.__dict__: + del app.__dict__["_agent_data_tensors"] + with open(fpath, "wb") as fout: + pickle.dump(app, fout) + + def load(self, fpath: str) -> "Model": + """ + Loads model from pickle file. + + :param fpath: file path to pickle file. + """ + with open(fpath, "rb") as fin: + app = pickle.load(fin) + return app def smap(func_args): @@ -227,8 +249,11 @@ def generate_gpu_func( g = cuda.cg.this_grid() breed_id = a0[agent_id] for tick in range({ticks}): - device_global_data_vector[0] = tick {sim_loop} + if thread_id == 0: + device_global_data_vector[0] += 1 + g.sync() + \nstepfunc = cuda.jit(stepfunc) \nstepfunc[blockspergrid, Model.THREADSPERBLOCK]( device_global_data_vector, diff --git a/superneuroabm/core/util.py b/superneuroabm/core/util.py index 1ba52f8..7c7d187 100644 --- a/superneuroabm/core/util.py +++ b/superneuroabm/core/util.py @@ -2,6 +2,7 @@ from typing import Iterable, List, Any, Tuple, Optional import numpy as np +from numba import cuda def convert_to_equal_side_tensor( @@ -24,7 +25,7 @@ def find_max_depth(l, curr_depth=0): print("no dims specified... running find max depth") find_max_depth(tensor) max_dims = tuple(list(dim2maxlen.values())) - answer = np.full(shape=max_dims, fill_value=np.nan) + answer = np.full(shape=max_dims, fill_value=np.nan, dtype=np.float32) def fill_arr(arr, coord): if len(coord) == len(max_dims): @@ -54,6 +55,8 @@ def compress_tensor(arr: Iterable, level: int = 0): else: new_arr = [] for item in arr: + if type(item) == cuda.cudadrv.devicearray.DeviceNDArray: + item = item.copy_to_host() new_item = compress_tensor(item, level + 1) if ( (not isinstance(new_item, Iterable) and new_item != None) diff --git a/superneuroabm/model.py b/superneuroabm/model.py index 60617c0..f48c86b 100755 --- a/superneuroabm/model.py +++ b/superneuroabm/model.py @@ -10,14 +10,12 @@ from superneuroabm.neuron import ( synapse_step_func, neuron_step_func, - synapse_with_stdp_step_func, ) class NeuromorphicModel(Model): def __init__( self, - use_cuda: bool = False, neuron_breed_info: Dict[str, List[Callable]] = { "Neuron": [neuron_step_func, synapse_step_func] }, @@ -36,7 +34,7 @@ def __init__( breed every simulation step in the order specifed in the list. """ - super().__init__(name="NeuromorphicModel", use_cuda=use_cuda) + super().__init__() axonal_delay = 1 neuron_properties = { @@ -49,6 +47,7 @@ def __init__( "internal_state": 0, "neuron_delay_reg": [0 for _ in range(axonal_delay)], "input_spikes": [], + "output_synapses_learning_params": [], "output_spikes": [], } max_dims = { @@ -61,6 +60,7 @@ def __init__( "internal_state": [], "neuron_delay_reg": None, "input_spikes": None, + "output_synapses_learning_params": None, "output_spikes": None, } @@ -79,10 +79,33 @@ def __init__( self._neuron_breeds[breed_name] = neuron_breed self._output_synapsess_max_dim = [0, 2] + self._original_output_synapse_weights = {} + self._synapse_index_map = {} + + def setup( + self, + use_cuda: bool = False, + output_buffer_len: int = 1000, + retain_weights=False, + ) -> None: + """ + Resets the simulation and initializes agents. + + :param retain_weights: False by default. If True, updated weights are + not reset upon setup. + """ - def setup(self, output_buffer_len: int = 1000) -> None: neuron_ids = self._agent_factory.num_agents for neuron_id in range(neuron_ids): + # Clear input spikes + new_input_spikes = [] + super().set_agent_property_value( + id=neuron_id, + property_name="input_spikes", + value=new_input_spikes, + dims=[0, 2], + ) + # Clear output buffer output_buffer = [0 for _ in range(output_buffer_len)] super().set_agent_property_value( id=neuron_id, @@ -90,7 +113,54 @@ def setup(self, output_buffer_len: int = 1000) -> None: value=output_buffer, dims=[output_buffer_len], ) - super().setup() + # Clear internal states + reset_state = super().get_agent_property_value( + id=neuron_id, property_name="reset_state" + ) + super().set_agent_property_value( + id=neuron_id, + property_name="internal_state", + value=reset_state, + dims=[], + ) + # Clear neuron delay registers + axonal_delay = len( + super().get_agent_property_value( + id=neuron_id, property_name="neuron_delay_reg" + ) + ) + neuron_delay_reg = [0 for _ in range(axonal_delay)] + self.set_agent_property_value( + neuron_id, "neuron_delay_reg", neuron_delay_reg, [axonal_delay] + ) + # Clear synaptic delay registers + output_synapses = self.get_agent_property_value( + neuron_id, "output_synapses" + ) + max_synapse_info_len = 2 + # Iterate through synapse info lists for this neuron + for i in range(len(output_synapses)): + # Clear weight (2nd element) if necessary + if not retain_weights: + output_synapses[i][ + 1 + ] = self._original_output_synapse_weights[neuron_id][i] + # Clear the rest of the list, which is the delay register + j = 2 + while j < len(output_synapses[i]): + output_synapses[i][j] = 0 + j += 1 + max_synapse_info_len = max( + len(output_synapses[i]), max_synapse_info_len + ) + self.set_agent_property_value( + neuron_id, + "output_synapses", + output_synapses, + [len(output_synapses), max_synapse_info_len], + ) + + super().setup(use_cuda=use_cuda) def simulate( self, ticks: int, update_data_ticks: int = 1, num_cpu_proc: int = 4 @@ -128,6 +198,9 @@ def create_neuron( self.set_agent_property_value( neuron_id, "neuron_delay_reg", delay_reg, [axonal_delay] ) + self.set_agent_property_value( + neuron_id, "internal_state", reset_state, [] + ) # synapse_infos = [] # self._output_synapsess.append(synapse_infos) return neuron_id @@ -138,18 +211,41 @@ def create_synapse( post_neuron_id: int, weight: int = 1, synaptic_delay: int = 1, + synapse_learning_params: List[float] = None, ) -> None: """ Creates and adds Synapse agent. + :param pre_neuron_id: int presynaptic neuron id + :param post_neuron_id: int postsynaptic neuron id + :param weight: weight of synapse + :param synaptic delay: number of timesteps to delay synapse by + :param synapse_learning_params: Optional. Any parameters used in a learning + enabled step function. Must be specified in order of use + in step function. """ - delay_reg = [0 for _ in range(synaptic_delay)] - synapse_info = [post_neuron_id, weight] - synapse_info.extend(delay_reg) - # self._output_synapsess[pre_neuron_id].append(synapse_info) output_synapses = self.get_agent_property_value( pre_neuron_id, "output_synapses" ) + if pre_neuron_id in self._synapse_index_map: + output_synapse_index_map = self._synapse_index_map[pre_neuron_id] + if post_neuron_id in output_synapse_index_map: + raise ValueError( + f"Synapse {pre_neuron_id} -> {post_neuron_id}already exists" + ) + else: + output_synapse_index_map[post_neuron_id] = len(output_synapses) + else: + output_synapse_index_map = {post_neuron_id: 0} + self._synapse_index_map[pre_neuron_id] = output_synapse_index_map + + # Update or enter new synapse params + output_synapses = self.get_agent_property_value( + pre_neuron_id, "output_synapses" + ) + delay_reg = [0 for _ in range(synaptic_delay)] + synapse_info = [post_neuron_id, weight] + synapse_info.extend(delay_reg) output_synapses.append(synapse_info) self.set_agent_property_value( pre_neuron_id, @@ -158,16 +254,79 @@ def create_synapse( [len(output_synapses), len(synapse_info)], ) - max_synapses = max( - self._output_synapsess_max_dim[0], - len(output_synapses), + # store original weights + self._original_output_synapse_weights[ + pre_neuron_id + ] = self._original_output_synapse_weights.get(pre_neuron_id, []) + self._original_output_synapse_weights[pre_neuron_id].append(weight) + + # Update or enter learning params + synapses_learning_params = self.get_agent_property_value( + pre_neuron_id, "output_synapses_learning_params" ) - max_delay_reg_len = max( - self._output_synapsess_max_dim[0], len(synapse_info) + if synapse_learning_params == None: + synapse_learning_params = [] + synapses_learning_params.append(synapse_learning_params) + self.set_agent_property_value( + id=pre_neuron_id, + property_name="output_synapses_learning_params", + value=synapses_learning_params, + dims=[ + len(synapses_learning_params), + len(synapse_learning_params), + ], ) - self._output_synapsess_max_dim = [max_synapses, max_delay_reg_len] - def spike(self, neuron_id: int, tick: int, value: float) -> None: + def update_synapse( + self, + pre_neuron_id: int, + post_neuron_id: int, + weight: int = None, + synapse_learning_params: List[float] = None, + ): + if pre_neuron_id in self._synapse_index_map: + output_synapse_index_map = self._synapse_index_map[pre_neuron_id] + if post_neuron_id in output_synapse_index_map: + synapse_idx = output_synapse_index_map[post_neuron_id] + else: + raise ValueError( + f"Synapse {pre_neuron_id} -> {post_neuron_id} does not exist" + ) + else: + raise ValueError( + f"Synapse {pre_neuron_id} -> {post_neuron_id} does not exist" + ) + + # Update new synapse params + if weight != None: + output_synapses = self.get_agent_property_value( + pre_neuron_id, "output_synapses" + ) + output_synapses[synapse_idx][1] = weight + self.set_agent_property_value( + pre_neuron_id, + "output_synapses", + output_synapses, + [len(output_synapses), len(output_synapses[synapse_idx])], + ) + + # Update or enter learning params + if synapse_learning_params: + synapses_learning_params = self.get_agent_property_value( + pre_neuron_id, "output_synapses_learning_params" + ) + synapses_learning_params[synapse_idx] = synapse_learning_params + self.set_agent_property_value( + id=pre_neuron_id, + property_name="output_synapses_learning_params", + value=synapses_learning_params, + dims=[ + len(synapses_learning_params), + len(synapse_learning_params), + ], + ) + + def add_spike(self, neuron_id: int, tick: int, value: float) -> None: """ Schedules an external input spike to this Neuron. @@ -183,7 +342,7 @@ def spike(self, neuron_id: int, tick: int, value: float) -> None: neuron_id, "input_spikes", spikes, [len(spikes), 2] ) - def get_spikes(self, neuron_id: int) -> np.array: + def get_spike_times(self, neuron_id: int) -> np.array: spike_train = super().get_agent_property_value( id=neuron_id, property_name="output_spikes", @@ -221,3 +380,9 @@ def summary(self) -> str: ) return "\n".join(summary) + + def save(self, fpath: str): + super().save(self, fpath) + + def load(self, fpath: str): + self = super().load(fpath) diff --git a/superneuroabm/neuron.py b/superneuroabm/neuron.py index b2cb939..433c082 100755 --- a/superneuroabm/neuron.py +++ b/superneuroabm/neuron.py @@ -17,6 +17,7 @@ def neuron_step_func( internal_state, neuron_delay_regs, input_spikess, + output_synapses_learning_paramss, output_spikess, my_idx, ): @@ -31,7 +32,11 @@ def neuron_step_func( for spike in input_spikess[my_idx]: if spike[0] == t_current: internal_state[my_idx] += spike[1] - internal_state[my_idx] -= leaks[my_idx] + internal_state[my_idx] = ( + internal_state[my_idx] - leaks[my_idx] + if internal_state[my_idx] - leaks[my_idx] > reset_states[my_idx] + else reset_states[my_idx] + ) # Apply axonal delay to the output of the spike: # Shift the register each time step @@ -85,6 +90,7 @@ def synapse_step_func( internal_state, neuron_delay_regs, input_spikess, + output_synapses_learning_paramss, output_spikess, my_idx, ): @@ -95,6 +101,10 @@ def synapse_step_func( out_neuron_id = out_synapse_info[0] if math.isnan(out_neuron_id): break + out_neuron_id = int(out_neuron_id) + # If out_neuron still in refractory period, return + if t_elapses[out_neuron_id] > 0: + continue weight = out_synapse_info[1] synapse_register = out_synapse_info[2:] # Check if delayed Vm was over threshold, if so spike @@ -114,7 +124,7 @@ def synapse_step_func( ) Vm = synapse_register[syn_delay_reg_tail] if not math.isnan(Vm) and Vm != 0: - internal_state[int(out_neuron_id)] += weight + internal_state[out_neuron_id] += weight def synapse_with_stdp_step_func( @@ -129,6 +139,7 @@ def synapse_with_stdp_step_func( internal_state, neuron_delay_regs, input_spikess, + output_synapses_learning_paramss, output_spikess, my_idx, ): @@ -140,6 +151,7 @@ def synapse_with_stdp_step_func( out_neuron_id = out_synapse_info[0] if math.isnan(out_neuron_id): break + out_neuron_id = int(out_neuron_id) weight = out_synapse_info[1] synapse_register = out_synapse_info[2:] # Check if delayed Vm was over threshold, if so spike @@ -173,16 +185,19 @@ def synapse_with_stdp_step_func( # Perform STDP weight change if t_current < 2: return - stdp_timesteps = 3 - A_pos = 0.6 - A_neg = 0.3 - tau_pos = 8 - tau_neg = 5 + output_synapse_learning_params = output_synapses_learning_paramss[ + my_idx + ][synapse_idx] + stdp_timesteps = output_synapse_learning_params[0] + A_pos = output_synapse_learning_params[1] # 0.6 + A_neg = output_synapse_learning_params[2] # 0.3 + tau_pos = output_synapse_learning_params[3] # 8 + tau_neg = output_synapse_learning_params[4] # 5 presynaptic_spikes = output_spikess[my_idx] postsynaptic_spikes = output_spikess[int(out_neuron_id)] delta_w = 0 - for delta_t in range(stdp_timesteps): + for delta_t in range(int(stdp_timesteps)): pre_to_post_correlation = ( presynaptic_spikes[t_current - 1 - delta_t] * postsynaptic_spikes[t_current] diff --git a/tests/basic_test.py b/tests/basic_test.py new file mode 100644 index 0000000..6e3855c --- /dev/null +++ b/tests/basic_test.py @@ -0,0 +1,290 @@ +import unittest + +from superneuroabm.model import NeuromorphicModel +from superneuroabm.neuron import ( + neuron_step_func, + synapse_step_func, + synapse_with_stdp_step_func, +) + + +class BasicTest(unittest.TestCase): + """ + Tests Spike Time Dependent Plasticity learning + + """ + + def __init__(self, methodName: str = ...) -> None: + super().__init__(methodName) + + def test_consecutive_simulate_calls(self): + """Tests simulate calls""" + + model = NeuromorphicModel() + # Create neurons + input = model.create_neuron(threshold=0.0) + output = model.create_neuron(threshold=0.0) + + # Create synapses + model.create_synapse( + pre_neuron_id=input, post_neuron_id=output, weight=1.0 + ) + + # Setup and simulate + model.setup(output_buffer_len=10, use_cuda=True) + spikes = {1: [(input, 1)], 6: [(input, 100)]} + for time in spikes: + for neuron, value in spikes[time]: + model.add_spike(neuron_id=neuron, tick=time, value=value) + + model.simulate(ticks=6) + print(model.summary()) + assert model._global_data_vector[0] == 6, "Ticks are off" + assert model.get_spike_times(output) == [ + 2 + ], "Simulation did not continue properly" + + model.simulate(ticks=4) + print(model.summary()) + assert ( + model._global_data_vector[0] == 10 + ), "Ticks didn't continue correctly on second simulate call" + assert model.get_spike_times(output) == [ + 2, + 7, + ], "Simulation did not continue properly" + + def test_consecutive_setup_and_simulate_calls(self): + """Tests setup and simulate calls""" + + model = NeuromorphicModel() + # Create neurons + input = model.create_neuron(threshold=0.0) + output = model.create_neuron(threshold=0.0) + + # Create synapses + model.create_synapse( + pre_neuron_id=input, post_neuron_id=output, weight=1.0 + ) + # Setup and simulate + model.setup(output_buffer_len=10, use_cuda=True) + spikes = {1: [(input, 1)], 6: [(input, 100)]} + for time in spikes: + for neuron, value in spikes[time]: + model.add_spike(neuron_id=neuron, tick=time, value=value) + + model.simulate(ticks=6) + print(model.summary()) + assert model._global_data_vector[0] == 6, "Ticks are off" + assert model.get_spike_times(output) == [ + 2 + ], "Simulation did not execute properly" + + model.setup(output_buffer_len=10, use_cuda=True) + spikes = {1: [(input, 1)], 6: [(input, 100)]} + for time in spikes: + for neuron, value in spikes[time]: + model.add_spike(neuron_id=neuron, tick=time, value=value) + model.simulate(ticks=4) + print(model.summary()) + assert ( + model._global_data_vector[0] == 4 + ), "Ticks didn't continue correctly on second simulate call" + + assert model.get_spike_times(output) == [ + 2, + ], "Simulation did not reset properly" + + def test_breeds(self): + """Tests multi-breed""" + + neuron_breed_info = { + "Neuron": [neuron_step_func, synapse_step_func], + "NeuronSTDPForward": [ + neuron_step_func, + synapse_with_stdp_step_func, + ], + } + model = NeuromorphicModel(neuron_breed_info=neuron_breed_info) + # Create neurons + input_0 = model.create_neuron(breed="NeuronSTDPForward", threshold=0.0) + input_1 = model.create_neuron(breed="Neuron", threshold=0.0) + output_2 = model.create_neuron(breed="Neuron", threshold=2.0) + + # Create synapses + model.create_synapse( + pre_neuron_id=input_0, + post_neuron_id=output_2, + weight=1.0, + synapse_learning_params=[3, 0.6, 0.3, 8, 5, 0.8, 1000], + ) + model.create_synapse( + pre_neuron_id=input_1, post_neuron_id=output_2, weight=1.0 + ) + + # Setup and simulate + model.setup(output_buffer_len=10, use_cuda=True) + + spikes = { + 1: [ + (input_0, 0), + (input_1, 0), + ], # Input: (0, 0); Expected output: No spike + 3: [ + (input_0, 0), + (input_1, 1), + ], # Input: (0, 1); Expected output: Spike at time 4 + 5: [ + (input_0, 1), + (input_1, 0), + ], # Input: (1, 0); Expected output: Spike at time 6 + 7: [ + (input_0, 1), + (input_1, 1), + ], # Input: (1, 1); Expected output: Spike at time 8 + } + for time in spikes: + for neuron, value in spikes[time]: + model.add_spike(neuron_id=neuron, tick=time, value=value) + + model.simulate(ticks=10) + print(model.summary()) + assert ( + round(model.get_synapse_weight(0, 2)) != 1 + ), "Breed had no effect" + assert model.get_synapse_weight(1, 2) == 1, "Breed had no effect" + + def test_save_load(self): + neuron_breed_info = { + "Neuron": [neuron_step_func, synapse_step_func], + "NeuronSTDPForward": [ + neuron_step_func, + synapse_with_stdp_step_func, + ], + } + model = NeuromorphicModel(neuron_breed_info=neuron_breed_info) + # Create neurons + input_0 = model.create_neuron(breed="NeuronSTDPForward", threshold=0.0) + input_1 = model.create_neuron(breed="Neuron", threshold=0.0) + output_2 = model.create_neuron(breed="Neuron", threshold=2.0) + + # Create synapses + model.create_synapse( + pre_neuron_id=input_0, + post_neuron_id=output_2, + weight=1.0, + synapse_learning_params=[3, 0.6, 0.3, 8, 5, 0.8, 1000], + ) + model.create_synapse( + pre_neuron_id=input_1, post_neuron_id=output_2, weight=5.0 + ) + model.setup(use_cuda=True, output_buffer_len=10) + + spikes = { + 7: [ + (input_0, 1), + (input_1, 1), + ], # Expected output: Spike at time 8 + } + for time in spikes: + for neuron, value in spikes[time]: + model.add_spike(neuron_id=neuron, tick=time, value=value) + + # Setup and simulate + model.simulate(ticks=10) + print(model.summary()) + model.save("model.pickle") + model.load("model.pickle") + print(model.summary()) + assert ( + round(model.get_synapse_weight(input_0, output_2)) == 241 + ), "STDP enabled synapse failed" + assert ( + model.get_synapse_weight(input_1, output_2) == 5 + ), "Weight change on STDP disabled synapse" + + model.save("model.pickle") + model.load("model.pickle") + model.update_synapse( + input_0, output_2, synapse_learning_params=[0, 0, 0, 0, 0, 0, 0] + ) + # Setup and simulate + model.setup(use_cuda=True, output_buffer_len=10, retain_weights=True) + spikes = { + 3: [ + (input_0, 1), + ], # Expected output: Spike at time 8 + } + for time in spikes: + for neuron, value in spikes[time]: + model.add_spike(neuron_id=neuron, tick=time, value=value) + + model.simulate(ticks=10) + print(model.summary()) + assert model.get_spike_times(output_2) == [ + 4 + ], "Trained and loaded model did not spike correctly" + + def test_leak_to_reset(self): + """Tests if leak does not cause neuron internal state to drop beyond reset state""" + + model = NeuromorphicModel() + # Create neurons + input = model.create_neuron(threshold=0.0) + output1_reset_state = -1 + output1 = model.create_neuron( + threshold=0.0, leak=1, reset_state=output1_reset_state + ) + output2 = model.create_neuron(threshold=0.0, leak=1) + + # Create synapses + model.create_synapse( + pre_neuron_id=input, post_neuron_id=output1, weight=10.0 + ) + model.create_synapse( + pre_neuron_id=input, post_neuron_id=output2, weight=10.0 + ) + # Setup and simulate + model.setup(output_buffer_len=10, use_cuda=True) + spikes = {1: [(input, 1)]} + for time in spikes: + for neuron, value in spikes[time]: + model.add_spike(neuron_id=neuron, tick=time, value=value) + + model.simulate(ticks=10) + + assert ( + model.get_agent_property_value(output1, "internal_state") + == output1_reset_state + ) + assert model.get_agent_property_value( + output2, "internal_state" + ) == model.get_agent_property_value(output2, "reset_state") + + def test_refactory_period(self): + """Tests if refactory periods greater than 1 work""" + + model = NeuromorphicModel() + # Create neurons + input = model.create_neuron(threshold=0.0) + output1 = model.create_neuron(threshold=0.0) + output2 = model.create_neuron(threshold=0.0, refractory_period=5) + + # Create synapses + model.create_synapse( + pre_neuron_id=input, post_neuron_id=output1, weight=10.0 + ) + model.create_synapse( + pre_neuron_id=input, post_neuron_id=output2, weight=10.0 + ) + # Setup and simulate + model.setup(output_buffer_len=10, use_cuda=True) + spikes = {1: [(input, 1)], 3: [(input, 1)]} + for time in spikes: + for neuron, value in spikes[time]: + model.add_spike(neuron_id=neuron, tick=time, value=value) + + model.simulate(ticks=10) + + assert model.get_spike_times(output1) == [2, 4] + assert model.get_spike_times(output2) == [2] diff --git a/tests/learning_test.py b/tests/learning_test.py new file mode 100644 index 0000000..4a604eb --- /dev/null +++ b/tests/learning_test.py @@ -0,0 +1,306 @@ +import unittest + +from superneuroabm.model import NeuromorphicModel +from superneuroabm.neuron import ( + neuron_step_func, + synapse_step_func, + synapse_with_stdp_step_func, +) + + +class LearningTest(unittest.TestCase): + """ + Tests Spike Time Dependent Plasticity learning + + """ + + def __init__(self, methodName: str = ...) -> None: + super().__init__(methodName) + # Create NeuromorphicModel + neuron_breed_info = { + "Neuron": [neuron_step_func, synapse_step_func], + "NeuronSTDPForward": [ + neuron_step_func, + synapse_with_stdp_step_func, + ], + } + self._model = NeuromorphicModel(neuron_breed_info=neuron_breed_info) + + def test_stdp(self): + """Tests STDP""" + + # Create neurons + input_0 = self._model.create_neuron( + breed="NeuronSTDPForward", threshold=0.0 + ) + input_1 = self._model.create_neuron(breed="Neuron", threshold=0.0) + output_2 = self._model.create_neuron(threshold=2.0) + + # Create synapses + self._model.create_synapse( + pre_neuron_id=input_0, + post_neuron_id=output_2, + weight=1.0, + synapse_learning_params=[3, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + + self._model.create_synapse( + pre_neuron_id=input_1, post_neuron_id=output_2, weight=1.0 + ) + # Setup and simulate + self._model.setup(output_buffer_len=10, use_cuda=True) + spikes = { + 1: [ + (input_0, 0), + (input_1, 0), + ], # Input: (0, 0); Expected output: No spike + 3: [ + (input_0, 0), + (input_1, 1), + ], # Input: (0, 1); Expected output: Spike at time 4 + 5: [ + (input_0, 1), + (input_1, 0), + ], # Input: (1, 0); Expected output: Spike at time 6 + 7: [ + (input_0, 1), + (input_1, 1), + ], # Input: (1, 1); Expected output: Spike at time 8 + } + expected_times = [8] + for time in spikes: + for neuron, value in spikes[time]: + self._model.add_spike(neuron_id=neuron, tick=time, value=value) + + self._model.simulate(ticks=10) + print(self._model.summary()) + assert ( + round(self._model.get_synapse_weight(0, 2)) == 499 + ), "STDP weight update failed" + assert ( + self._model.get_synapse_weight(1, 2) == 1 + ), "STDP weight update failed" + + def test_learning_per_synapse(self): + """Tests if learning can be turned on or off per synapse""" + + # Create neurons + input_0 = self._model.create_neuron( + breed="NeuronSTDPForward", threshold=0.0 + ) + input_1 = self._model.create_neuron(breed="Neuron", threshold=0.0) + input_2 = self._model.create_neuron( + breed="NeuronSTDPForward", threshold=0.0 + ) + output = self._model.create_neuron(threshold=2.0) + + # Create synapses + self._model.create_synapse( + pre_neuron_id=input_0, + post_neuron_id=output, + weight=1.0, + synapse_learning_params=[3, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + + self._model.create_synapse( + pre_neuron_id=input_1, post_neuron_id=output, weight=1.0 + ) + + self._model.create_synapse( + pre_neuron_id=input_2, + post_neuron_id=output, + weight=1.0, + synapse_learning_params=[0, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + # Setup and simulate + self._model.setup(output_buffer_len=10, use_cuda=True) + spikes = { + 1: [ + (input_0, 0), + (input_1, 0), + (input_2, 0), + ], # Input: (0, 0); Expected output: No spike + 3: [ + (input_0, 0), + (input_1, 1), + ], # Input: (0, 1); Expected output: Spike at time 4 + 5: [ + (input_0, 1), + (input_1, 0), + (input_2, 1), + ], # Input: (1, 0); Expected output: Spike at time 6 + 7: [ + (input_0, 1), + (input_1, 1), + (input_2, 1), + ], # Input: (1, 1); Expected output: Spike at time 8 + } + for time in spikes: + for neuron, value in spikes[time]: + self._model.add_spike(neuron_id=neuron, tick=time, value=value) + + self._model.simulate(ticks=10) + print(self._model.summary()) + assert ( + round(self._model.get_synapse_weight(input_0, output)) == 499 + ), "STDP weight update failed" + assert ( + self._model.get_synapse_weight(input_1, output) == 1 + ), "STDP weight updated despite not specified" + assert ( + self._model.get_synapse_weight(input_2, output) == 1 + ), "STDP suppression failed" + + def test_learning_on_off_consecutively(self): + """Tests if learning can be turned on and off consecutively + across multiple simulation calls for single run. + """ + + # Create neurons + input_0 = self._model.create_neuron( + breed="NeuronSTDPForward", threshold=0.0 + ) + output = self._model.create_neuron(threshold=2.0) + + # Create synapses + self._model.create_synapse( + pre_neuron_id=input_0, + post_neuron_id=output, + weight=5.0, + synapse_learning_params=[3, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + # Setup and simulate + self._model.setup(output_buffer_len=30, use_cuda=True) + spikes = { + 7: [ + (input_0, 1), + ], # Expected output: Spike at time 8 + 17: [ + (input_0, 1), + ], # Expected output: Spike at time 18 + 27: [ + (input_0, 1), + ], # Expected output: Spike at time 28 + } + for time in spikes: + for neuron, value in spikes[time]: + self._model.add_spike(neuron_id=neuron, tick=time, value=value) + + self._model.simulate(ticks=10) + print(self._model.summary()) + assert ( + round(self._model.get_synapse_weight(input_0, output)) == 244 + ), "STDP weight update failed" + print(self._model.get_synapse_weight(input_0, output)) + + # Now update synapse to turn off STDP + # Create synapses + self._model.update_synapse( + input_0, + output, + synapse_learning_params=[0, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + self._model.simulate(ticks=10) + print(self._model.summary()) + print(self._model.get_synapse_weight(input_0, output)) + assert ( + round(self._model.get_synapse_weight(input_0, output)) == 244 + ), "STDP off failed" + + # Finally, try enabling STDP for the rest of the sim again + # Create synapses + self._model.update_synapse( + input_0, + output, + synapse_learning_params=[3, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + self._model.simulate(ticks=10) + print(self._model.summary()) + print(self._model.get_synapse_weight(input_0, output)) + assert ( + round(self._model.get_synapse_weight(input_0, output)) == 425 + ), "STDP off failed" + + def test_learning_on_setup_then_off(self): + """Tests if learning can be turned on and off + across simulation runs and if updated weights are + retained + """ + + # Create neurons + input = self._model.create_neuron( + breed="NeuronSTDPForward", threshold=0.0 + ) + output_0 = self._model.create_neuron( + breed="NeuronSTDPForward", threshold=0.0 + ) + output_1 = self._model.create_neuron(breed="Neuron", threshold=2.0) + + # Create synapses + self._model.create_synapse( + pre_neuron_id=input, + post_neuron_id=output_0, + weight=5.0, + synapse_learning_params=[0, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + self._model.create_synapse( + pre_neuron_id=input, + post_neuron_id=output_1, + weight=5.0, + synapse_learning_params=[3, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + # Setup and simulate + self._model.setup(output_buffer_len=20, use_cuda=True) + spikes = { + 7: [ + (input, 1), + ], # Expected output: Spike at time 8 + 17: [ + (input, 1), + ], # Expected output: Spike at time 18 + } + for time in spikes: + for neuron, value in spikes[time]: + self._model.add_spike(neuron_id=neuron, tick=time, value=value) + + self._model.simulate(ticks=20) + print(self._model.summary()) + assert ( + round(self._model.get_synapse_weight(input, output_0)) == 5 + ), "STDP weight update failed" + assert ( + round(self._model.get_synapse_weight(input, output_1)) == 425 + ), "STDP weight update failed" + + # Now setup and turn off STDP + # Create synapses + self._model.setup( + output_buffer_len=10, use_cuda=False, retain_weights=True + ) + + for time in spikes: + for neuron, value in spikes[time]: + self._model.add_spike(neuron_id=neuron, tick=time, value=value) + + self._model.update_synapse( + input, + output_1, + synapse_learning_params=[0, 0.6, 0.3, 8, 5, 0.8, 1000], + # stdp_timesteps, A_pos, A_neg, tau_pos, tau_neg, sigma, w_max + ) + self._model.simulate(ticks=10) + print(self._model.summary()) + assert ( + round(self._model.get_synapse_weight(input, output_0)) == 5 + ), "STDP weight update failed" + assert ( + round(self._model.get_synapse_weight(input, output_1)) == 425 + ), "STDP weight update failed" diff --git a/tests/logic_gates_test.py b/tests/logic_gates_test.py index 28ed787..e379880 100644 --- a/tests/logic_gates_test.py +++ b/tests/logic_gates_test.py @@ -13,7 +13,8 @@ class LogicGatesTest(unittest.TestCase): def __init__(self, methodName: str = ...) -> None: super().__init__(methodName) # Create NeuromorphicModel - self._model = NeuromorphicModel(use_cuda=False) + self._model = NeuromorphicModel() + self._use_cuda = False def test_two_neurons(self): """Tests working of two neurons""" @@ -27,33 +28,34 @@ def test_two_neurons(self): pre_neuron_id=neuron_0, post_neuron_id=neuron_1 ) + # Setup and simulate + self._model.setup(output_buffer_len=10, use_cuda=self._use_cuda) + # Add spikes spikes = [(1, 1), (2, 1)] for spike in spikes: - self._model.spike( + self._model.add_spike( neuron_id=neuron_0, tick=spike[0], value=spike[1] ) spikes = [(9, 1), (5, 1), (4, 1)] for spike in spikes: - self._model.spike( + self._model.add_spike( neuron_id=neuron_1, tick=spike[0], value=spike[1] ) - # Setup and simulate - self._model.setup(output_buffer_len=10) self._model.simulate(ticks=10) expected_times = [1, 2] - print(self._model.get_spikes(neuron_id=neuron_0)) - print(self._model.get_spikes(neuron_id=neuron_1)) + print(self._model.get_spike_times(neuron_id=neuron_0)) + print(self._model.get_spike_times(neuron_id=neuron_1)) assert ( - self._model.get_spikes(neuron_id=neuron_0) == expected_times - ), f"Spike times are {self._model.get_spikes(neuron_id=neuron_0)} but should be {expected_times}" + self._model.get_spike_times(neuron_id=neuron_0) == expected_times + ), f"Spike times are {self._model.get_spike_times(neuron_id=neuron_0)} but should be {expected_times}" expected_times = [2, 3, 4, 5, 9] assert ( - self._model.get_spikes(neuron_id=neuron_1) == expected_times - ), f"Spike times are {self._model.get_spikes(neuron_id=neuron_1)} but should be {expected_times}" + self._model.get_spike_times(neuron_id=neuron_1) == expected_times + ), f"Spike times are {self._model.get_spike_times(neuron_id=neuron_1)} but should be {expected_times}" def test_or_gate(self): """Builds a neuromorphic circuit for OR gate and simulates it""" @@ -71,6 +73,9 @@ def test_or_gate(self): pre_neuron_id=input_1, post_neuron_id=output_2, weight=1.0 ) + # Setup and simulate + self._model.setup(output_buffer_len=10, use_cuda=self._use_cuda) + # test_cases in format time -> ([(Neuron, value), (Neuron, value)] test_cases = { 1: [ @@ -94,16 +99,14 @@ def test_or_gate(self): expected_times = [4, 6, 8] for time in test_cases: for neuron, value in test_cases[time]: - self._model.spike(neuron_id=neuron, tick=time, value=value) + self._model.add_spike(neuron_id=neuron, tick=time, value=value) - # Setup and simulate - self._model.setup(output_buffer_len=10) self._model.simulate(ticks=10) - print(self._model.get_spikes(neuron_id=output_2)) + print(self._model.get_spike_times(neuron_id=output_2)) assert ( - self._model.get_spikes(neuron_id=output_2) == expected_times - ), f"Spike times are {self._model.get_spikes(neuron_id=output_2)} but should be {expected_times}" + self._model.get_spike_times(neuron_id=output_2) == expected_times + ), f"Spike times are {self._model.get_spike_times(neuron_id=output_2)} but should be {expected_times}" def test_and_gate(self): """Builds a neuromorphic circuit for AND gate and simulates it""" @@ -121,6 +124,9 @@ def test_and_gate(self): pre_neuron_id=input_1, post_neuron_id=output_2, weight=1.0 ) + # Setup and simulate + self._model.setup(output_buffer_len=10, use_cuda=self._use_cuda) + # test_cases in format time -> ([(Neuron, value), (Neuron, value)] test_cases = { 1: [ @@ -144,15 +150,13 @@ def test_and_gate(self): expected_times = [8] for time in test_cases: for neuron, value in test_cases[time]: - self._model.spike(neuron_id=neuron, tick=time, value=value) + self._model.add_spike(neuron_id=neuron, tick=time, value=value) - # Setup and simulate - self._model.setup(output_buffer_len=10) self._model.simulate(ticks=10) - print(self._model.get_spikes(neuron_id=output_2)) + print(self._model.get_spike_times(neuron_id=output_2)) assert ( - self._model.get_spikes(neuron_id=output_2) == expected_times - ), f"Spike times are {self._model.get_spikes(neuron_id=output_2)} but should be {expected_times}" + self._model.get_spike_times(neuron_id=output_2) == expected_times + ), f"Spike times are {self._model.get_spike_times(neuron_id=output_2)} but should be {expected_times}" print(self._model.summary()) @@ -160,7 +164,8 @@ def test_and_gate(self): class LogicGatesTestGPU(LogicGatesTest): def __init__(self, methodName: str = ...) -> None: super().__init__(methodName) - self._model = NeuromorphicModel(use_cuda=True) + self._model = NeuromorphicModel() + self._use_cuda = True if __name__ == "__main__": diff --git a/tests/scalability_test.py b/tests/scalability_test.py index 1648323..ca86ffc 100644 --- a/tests/scalability_test.py +++ b/tests/scalability_test.py @@ -21,13 +21,13 @@ class ScaleTest(unittest.TestCase): def test_create_random_snn( self, - n_neurons: int = 10000, + n_neurons: int = 100, connection_prob: float = 0.5, ticks: int = 1000, ): np.random.seed(10) t_init_start = time.time() - model = NeuromorphicModel(use_cuda=True) + model = NeuromorphicModel() t_init_end = time.time() print( f"Time to initialize the model: {t_init_end - t_init_start} seconds." @@ -45,13 +45,13 @@ def test_create_random_snn( pre_neuron_id=n_i, post_neuron_id=n_j, weight=wt_val ) + model.setup(output_buffer_len=ticks, use_cuda=True) # Choose a single random neuron to stimulate: # input_id = np.random.randint(0, n_neurons - 1) spike_neuron = randint(a=0, b=n_neurons - 1) print("Input applied to neuron:", spike_neuron) - model.spike(spike_neuron, 1, 400) + model.add_spike(spike_neuron, 1, 400) - model.setup() t_create_end = time.time() print( f"Time to create the random network: {t_create_end - t_create_start} seconds" @@ -68,7 +68,7 @@ def test_create_random_snn( t_mon_start = time.time() # Get spike times for neuron_id in range(n_neurons): - spike_times = model.get_spikes(neuron_id=neuron_id) + spike_times = model.get_spike_times(neuron_id=neuron_id) t_mon_end = time.time() print( f"Time to get spikes for all neurons: {t_mon_end - t_mon_start} seconds" diff --git a/tests/stdp_test.py b/tests/stdp_test.py deleted file mode 100644 index 36dff5c..0000000 --- a/tests/stdp_test.py +++ /dev/null @@ -1,81 +0,0 @@ -import unittest - -from superneuroabm.model import NeuromorphicModel -from superneuroabm.neuron import ( - neuron_step_func, - synapse_step_func, - synapse_with_stdp_step_func, -) - - -class STDPTest(unittest.TestCase): - """ - Tests Spike Time Dependent Plasticity learning - - """ - - def __init__(self, methodName: str = ...) -> None: - super().__init__(methodName) - # Create NeuromorphicModel - neuron_breed_info = { - "Neuron": [neuron_step_func, synapse_step_func], - "NeuronSTDPForward": [ - neuron_step_func, - synapse_with_stdp_step_func, - ], - } - self._model = NeuromorphicModel( - use_cuda=True, neuron_breed_info=neuron_breed_info - ) - - def test_stdp(self): - """Tests STDP""" - - # Create neurons - input_0 = self._model.create_neuron( - breed="NeuronSTDPForward", threshold=0.0 - ) - input_1 = self._model.create_neuron(breed="Neuron", threshold=0.0) - output_2 = self._model.create_neuron(threshold=2.0) - - # Create synapses - self._model.create_synapse( - pre_neuron_id=input_0, post_neuron_id=output_2, weight=1.0 - ) - self._model.create_synapse( - pre_neuron_id=input_1, post_neuron_id=output_2, weight=1.0 - ) - - spikes = { - 1: [ - (input_0, 0), - (input_1, 0), - ], # Input: (0, 0); Expected output: No spike - 3: [ - (input_0, 0), - (input_1, 1), - ], # Input: (0, 1); Expected output: Spike at time 4 - 5: [ - (input_0, 1), - (input_1, 0), - ], # Input: (1, 0); Expected output: Spike at time 6 - 7: [ - (input_0, 1), - (input_1, 1), - ], # Input: (1, 1); Expected output: Spike at time 8 - } - expected_times = [8] - for time in spikes: - for neuron, value in spikes[time]: - self._model.spike(neuron_id=neuron, tick=time, value=value) - - # Setup and simulate - self._model.setup(output_buffer_len=10) - self._model.simulate(ticks=10) - print(self._model.summary()) - assert ( - round(self._model.get_synapse_weight(0, 2)) == 499 - ), "STDP weight update failed" - assert ( - self._model.get_synapse_weight(1, 2) == 1 - ), "STDP weight update failed"