Skip to content

Commit

Permalink
change test name due to improper closure
Browse files Browse the repository at this point in the history
  • Loading branch information
LMBooth committed Oct 24, 2023
1 parent 2a0cfd8 commit 69a8626
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 144 deletions.
45 changes: 16 additions & 29 deletions Tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,19 @@

import subprocess
import time
import threading

def check_terminate(proc, timeout=10):
start = time.time()
while time.time() - start < timeout:
if proc.poll() is not None:
return
time.sleep(0.1)
proc.terminate()

def test_cli_scripts():
scripts = [
'python /path/to/testPyTorch.py',
'python /path/to/testSimple.py',
'python /path/to/testSklearn.py',
'python /path/to/testTensorflow.py',
'python /path/to/cli.py'
]
def run_cli_command(command):
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = process.communicate()
return_code = process.returncode

if return_code == 0:
print(f"Command succeeded with output:\n{stdout.decode('utf-8')}")
else:
print(f"Command failed with error:\n{stderr.decode('utf-8')}")

for script in scripts:
print(f"Running {script}")
proc = subprocess.Popen(script, shell=True)
t = threading.Thread(target=check_terminate, args=(proc,))
t.start()
try:
t.join()
except KeyboardInterrupt:
proc.terminate()
break
# Example usage
def test_cli():
run_cli_command("pybci testSimple --timeout=10")
run_cli_command("pybci testSklearn --timeout=10")
run_cli_command("pybci testPyTorch --timeout=10")
run_cli_command("pybci testTensorflow --timeout=10")
assert True
File renamed without changes.
72 changes: 43 additions & 29 deletions pybci/CliTests/testPyTorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import torch
from torch.utils.data import DataLoader, TensorDataset
from torch import nn
import threading


def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, num_chs = 8, num_feats = 2, num_classes = 4):
def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, num_chs = 8, num_feats = 2, num_classes = 4, timeout=None):
if createPseudoDevice:
num_chs = 8 # 8 channels are created in the PseudoLSLGenerator
num_feats = 2 # default is mean freq and rms to keep it simple
Expand Down Expand Up @@ -60,37 +61,49 @@ def PyTorchModel(x_train, x_test, y_train, y_test):
accuracy = correct / len(y_test)
return accuracy, model

bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, torchModel = PyTorchModel)
while not bci.connected: # check to see if lsl marker and datastream are available
bci.Connect()
time.sleep(1)
bci.TrainMode() # now both marker and datastreams available start training on received epochs
accuracy = 0
test = False
try:
while(True):
if test is False:
currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing
time.sleep(0.5) # wait for marker updates
print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r")
if len(currentMarkers) > 1: # check there is more then one marker type received
if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired:
classInfo = bci.CurrentClassifierInfo() # hangs if called too early
accuracy = classInfo["accuracy"]
if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_test:
bci.TestMode()
break
else:
markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned
guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess]
print("Current marker estimation: " + str(guess), end=" \r")
time.sleep(0.2)
def loop(bci):
while not bci.connected: # check to see if lsl marker and datastream are available
bci.Connect()
time.sleep(1)
bci.TrainMode() # now both marker and datastreams available start training on received epochs
accuracy = 0
test = False
try:
while(True):
if test is False:
currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing
time.sleep(0.5) # wait for marker updates
print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r")
if len(currentMarkers) > 1: # check there is more then one marker type received
if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired:
classInfo = bci.CurrentClassifierInfo() # hangs if called too early
accuracy = classInfo["accuracy"]
if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_test:
bci.TestMode()
break
else:
markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned
guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess]
print("Current marker estimation: " + str(guess), end=" \r")
time.sleep(0.2)

return None
except KeyboardInterrupt: # allow user to break while loop
print("\nLoop interrupted by user.")
return None
except KeyboardInterrupt: # allow user to break while loop
print("\nLoop interrupted by user.")

def stop_after_timeout(bci):
time.sleep(timeout)
print("\nTimeout reached. Stopping threads.")
bci.StopThreads()

bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, torchModel = PyTorchModel)
main_thread = threading.Thread(target=loop, args=(bci,))
main_thread.start()
if timeout:
timeout_thread = threading.Thread(target=stop_after_timeout, args=(bci,))
timeout_thread.start()
timeout_thread.join()
main_thread.join()

if __name__ == '__main__':
parser = argparse.ArgumentParser(description="PyTorch neural network is used for model and pseudodevice generates 8 channels of 3 marker types and baseline. Similar to the testPytorch.py in the examples folder.")
Expand All @@ -99,6 +112,7 @@ def PyTorchModel(x_train, x_test, y_train, y_test):
parser.add_argument("--min_epochs_test", default=14, type=int, help='Minimum epochs to collect before model testing commences, if less than min_epochs_test defaults to min_epochs_test+1.')
parser.add_argument("--num_chs", default=8, type=int, help='Num of channels in data stream to configure tensorflow model, if PseudoDevice==True defaults to 8.')
parser.add_argument("--num_classes", default=4, type=int, help='Num of classes in marker stream to configure tensorflow model, if PseudoDevice==True defaults to 4.')
parser.add_argument("--timeout", default=None, type=int, help="Timeout in seconds for the script to automatically stop.")

args = parser.parse_args()
main(**vars(args))
73 changes: 46 additions & 27 deletions pybci/CliTests/testSimple.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,62 @@
import argparse
from pybci import PyBCI
import time
import threading

def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10):
def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, timeout=None):
def loop(bci):
while not bci.connected: # check to see if lsl marker and datastream are available
bci.Connect()
time.sleep(1)
bci.TrainMode() # now both marker and datastreams available start training on received epochs
accuracy = 0
test = False
try:
while(True):
if test is False:
currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing
time.sleep(0.5) # wait for marker updates
print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r")
if len(currentMarkers) > 1: # check there is more then one marker type received
if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired:
classInfo = bci.CurrentClassifierInfo() # hangs if called too early
accuracy = classInfo["accuracy"]
if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_test:
bci.TestMode()
test = True
else:
markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned
guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess]
print("Current marker estimation: " + str(guess), end=" \r")
time.sleep(0.2)
except KeyboardInterrupt: # allow user to break while loop
print("\nLoop interrupted by user.")

def stop_after_timeout(bci):
time.sleep(timeout)
print("\nTimeout reached. Stopping threads.")
bci.StopThreads()

if min_epochs_test <= min_epochs_train:
min_epochs_test = min_epochs_train+1
bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice)
while not bci.connected: # check to see if lsl marker and datastream are available
bci.Connect()
time.sleep(1)
bci.TrainMode() # now both marker and datastreams available start training on received epochs
accuracy = 0
test = False
try:
while(True):
if test is False:
currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing
time.sleep(0.5) # wait for marker updates
print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r")
if len(currentMarkers) > 1: # check there is more then one marker type received
if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired:
classInfo = bci.CurrentClassifierInfo() # hangs if called too early
accuracy = classInfo["accuracy"]
if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_test:
bci.TestMode()
test = True
else:
markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned
guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess]
print("Current marker estimation: " + str(guess), end=" \r")
time.sleep(0.2)
except KeyboardInterrupt: # allow user to break while loop
print("\nLoop interrupted by user.")
main_thread = threading.Thread(target=loop, args=(bci,))
main_thread.start()

if timeout:
timeout_thread = threading.Thread(target=stop_after_timeout, args=(bci,))
timeout_thread.start()
timeout_thread.join()

main_thread.join()

if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Runs simple setup where sklearn support-vector-machine is used for model and pseudodevice generates 8 channels of 3 marker types and a baseline. Similar to the testSimple.py in the examples folder.")
parser.add_argument("--createPseudoDevice", default=True, type=bool, help="Set to True or False to enable or disable pseudo device creation. pseudodevice generates 8 channels of 3 marker types and baseline.")
parser.add_argument("--min_epochs_train", default=4, type=int, help='Minimum epochs to collect before model training commences, must be less than, min_epochs_test. If less than min_epochs_test defaults to min_epochs_test+1.')
parser.add_argument("--min_epochs_test", default=14, type=int, help='Minimum epochs to collect before model testing commences, if less than min_epochs_test defaults to min_epochs_test+1.')
parser.add_argument("--timeout", default=None, type=int, help="Timeout in seconds for the script to automatically stop.")


args = parser.parse_args()
main(**vars(args))
75 changes: 47 additions & 28 deletions pybci/CliTests/testSklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,64 @@
import time
import argparse
from sklearn.neural_network import MLPClassifier
import threading

def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10):
def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, timeout=None):
def loop(bci):
while not bci.connected: # check to see if lsl marker and datastream are available
bci.Connect()
time.sleep(1)
bci.TrainMode() # now both marker and datastreams available start training on received epochs
accuracy = 0
test = False
try:
while(True):
if test is False:
currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing
time.sleep(0.5) # wait for marker updates
print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r")
if len(currentMarkers) > 1: # check there is more then one marker type received
if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired:
classInfo = bci.CurrentClassifierInfo() # hangs if called too early
accuracy = classInfo["accuracy"]
if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_train:
bci.TestMode()
break
else:
markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned
guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess]
print("Current marker estimation: " + str(guess), end=" \r")
time.sleep(0.2)
return None
except KeyboardInterrupt: # allow user to break while loop
print("\nLoop interrupted by user.")

def stop_after_timeout(bci):
time.sleep(timeout)
print("\nTimeout reached. Stopping threads.")
bci.StopThreads()

if min_epochs_test <= min_epochs_train:
min_epochs_test = min_epochs_train+1
clf = MLPClassifier(max_iter = 1000, solver ="lbfgs")#solver=clf, alpha=alpha,hidden_layer_sizes=hid)
if min_epochs_test <= min_epochs_train:
min_epochs_test = min_epochs_train+1
bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, clf = clf)
while not bci.connected: # check to see if lsl marker and datastream are available
bci.Connect()
time.sleep(1)
bci.TrainMode() # now both marker and datastreams available start training on received epochs
accuracy = 0
test = False
try:
while(True):
if test is False:
currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing
time.sleep(0.5) # wait for marker updates
print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r")
if len(currentMarkers) > 1: # check there is more then one marker type received
if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired:
classInfo = bci.CurrentClassifierInfo() # hangs if called too early
accuracy = classInfo["accuracy"]
if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_train:
bci.TestMode()
break
else:
markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned
guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess]
print("Current marker estimation: " + str(guess), end=" \r")
time.sleep(0.2)
return None
except KeyboardInterrupt: # allow user to break while loop
print("\nLoop interrupted by user.")
main_thread = threading.Thread(target=loop, args=(bci,))
main_thread.start()
if timeout:
timeout_thread = threading.Thread(target=stop_after_timeout, args=(bci,))
timeout_thread.start()
timeout_thread.join()

main_thread.join()

if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Sklearn multi-layer perceptron is used for model and pseudodevice generates 8 channels of 3 marker types and a baseline. Similar to the testSimple.py in the examples folder.")
parser.add_argument("--createPseudoDevice", default=True, type=bool, help="Set to True or False to enable or disable pseudo device creation. pseudodevice generates 8 channels of 3 marker types and baseline.")
parser.add_argument("--min_epochs_train", default=4, type=int, help='Minimum epochs to collect before model training commences, must be less than, min_epochs_test. If less than min_epochs_test defaults to min_epochs_test+1.')
parser.add_argument("--min_epochs_test", default=14, type=int, help='Minimum epochs to collect before model testing commences, if less than min_epochs_test defaults to min_epochs_test+1.')
parser.add_argument("--timeout", default=None, type=int, help="Timeout in seconds for the script to automatically stop.")

args = parser.parse_args()
main(**vars(args))
Loading

0 comments on commit 69a8626

Please sign in to comment.