Skip to content

Commit

Permalink
curr progress
Browse files Browse the repository at this point in the history
  • Loading branch information
eleanorjboyd committed Jan 8, 2024
1 parent 23c68db commit 9da43cf
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 33 deletions.
21 changes: 21 additions & 0 deletions pythonFiles/testing_tools/socket_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,27 @@ def write(self, data: str):
self._socket.send(request.encode("utf-8"))
# does this also need a flush on the socket?

def read(self, bufsize=1024):
"""Read data from the socket.
Args:
bufsize (int): Number of bytes to read from the socket.
Returns:
data (bytes): Data received from the socket.
"""
if sys.platform == "win32":
return self._reader.read(bufsize)
else:
data = b""
while True:
part = self._socket.recv(bufsize)
data += part
if len(part) < bufsize:
# No more data, or less than bufsize data received
break
return data


class SocketManager(object):
"""Create a socket and connect to the given address.
Expand Down
3 changes: 2 additions & 1 deletion pythonFiles/vscode_pytest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def pytest_load_initial_conftests(early_config, parser, args):
"as it is required for successful test discovery and execution."
f"TEST_RUN_PIPE = {TEST_RUN_PIPE}\n"
)
print(error_string, file=sys.stderr)
if not TEST_RUN_PIPE:
print(error_string, file=sys.stderr)
if "--collect-only" in args:
global IS_DISCOVERY
IS_DISCOVERY = True
Expand Down
9 changes: 7 additions & 2 deletions pythonFiles/vscode_pytest/run_pytest_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
sys.path.append(os.fspath(script_dir))
sys.path.append(os.fspath(script_dir / "lib" / "python"))
from testing_tools import process_json_util
from testing_tools import socket_manager


# This script handles running pytest via pytest.main(). It is called via run in the
# pytest execution adapter and gets the test_ids to run via stdin and the rest of the
Expand All @@ -25,15 +27,18 @@
# Get the rest of the args to run with pytest.
args = sys.argv[1:]
run_test_ids_pipe = os.environ.get("RUN_TEST_IDS_PIPE")
print("HELLO!", run_test_ids_pipe)
if not run_test_ids_pipe:
print("Error[vscode-pytest]: RUN_TEST_IDS_PIPE env var is not set.")
raw_json = {}
try:
with open(run_test_ids_pipe, "rb") as pipe:
socket_name = os.environ.get("RUN_TEST_IDS_PIPE")
with socket_manager.PipeManager(socket_name) as sock:
print("made it here")
buffer = b""
while True:
# Receive the data from the client
data = pipe.read()
data = sock.read()
if not data:
break

Expand Down
33 changes: 25 additions & 8 deletions src/client/common/pipes/namedPipes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,36 @@ import * as rpc from 'vscode-jsonrpc/node';
import { traceVerbose } from '../../logging';

export interface NamesPipeConnected {
onConnected(): Promise<[rpc.MessageReader, rpc.MessageWriter]>;
onClosed(): Promise<void>;
}

export function createNamedPipeServer(pipeName: string): Promise<NamesPipeConnected> {
export function createNamedPipeServer(
pipeName: string,
connectedCallback: (value: [rpc.MessageReader, rpc.MessageWriter]) => void,
): Promise<NamesPipeConnected> {
traceVerbose(`Creating named pipe server on ${pipeName}`);
let connectResolve: (value: [rpc.MessageReader, rpc.MessageWriter]) => void;
const connected = new Promise<[rpc.MessageReader, rpc.MessageWriter]>((resolve, _reject) => {
connectResolve = resolve;
let closedResolve: () => void;
const closed = new Promise<void>((resolve, _reject) => {
closedResolve = resolve;
});

let connectionCount = 0;
return new Promise((resolve, reject) => {
const server = net.createServer((socket) => {
server.close();
connectResolve([
connectionCount += 1;
console.log('connectionCount +1 = ', connectionCount);
socket.on('close', () => {
connectionCount -= 1;
console.log('connectionCount -1 = ', connectionCount);
if (connectionCount <= 0) {
console.log('all sockets are now closed 0 on the count!, closing resolver?');
server.close();
// this closedResolve calls the dispose method in the clients
closedResolve();
}
});
// not recieving the reader writer for all connections
connectedCallback([
new rpc.SocketMessageReader(socket, 'utf-8'),
new rpc.SocketMessageWriter(socket, 'utf-8'),
]);
Expand All @@ -30,7 +47,7 @@ export function createNamedPipeServer(pipeName: string): Promise<NamesPipeConnec
server.listen(pipeName, () => {
server.removeListener('error', reject);
resolve({
onConnected: () => connected,
onClosed: () => closed,
});
});
});
Expand Down
36 changes: 27 additions & 9 deletions src/client/testing/testController/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ export function pythonTestAdapterRewriteEnabled(serviceContainer: IServiceContai

export async function startTestIdsNamedPipe(testIds: string[]): Promise<string> {
const pipeName: string = generateRandomPipeName('python-test-ids');
const server = await createNamedPipeServer(pipeName);
server.onConnected().then(([_reader, writer]) => {
// uses callback so the on connect action occurs after the pipe is created
await createNamedPipeServer(pipeName, ([_reader, writer]) => {
traceVerbose('Test Ids named pipe connected');
writer
.write({
Expand All @@ -193,20 +193,23 @@ interface ExecutionResultMessage extends Message {

export async function startRunResultNamedPipe(
callback: (payload: ExecutionTestPayload | EOTTestPayload) => void,
deferredTillAllServerClose: Deferred<void>,
cancellationToken?: CancellationToken,
): Promise<{ name: string } & Disposable> {
const pipeName: string = generateRandomPipeName('python-test-results');
const server = await createNamedPipeServer(pipeName);
let dispose: () => void = () => {
/* noop */
};
server.onConnected().then(([reader, _writer]) => {
const server = await createNamedPipeServer(pipeName, ([reader, _writer]) => {
traceVerbose(`Test Result named pipe ${pipeName} connected`);
let disposables: (Disposable | undefined)[] = [reader];

dispose = () => {
traceVerbose(`Test Result named pipe ${pipeName} disposed`);
console.log(`Test Result named pipe ${pipeName} disposed`);
disposables.forEach((d) => d?.dispose());
disposables = [];
deferredTillAllServerClose.resolve();
};
disposables.push(
cancellationToken?.onCancellationRequested(() => {
Expand All @@ -218,12 +221,28 @@ export async function startRunResultNamedPipe(
callback((data as ExecutionResultMessage).params as ExecutionTestPayload | EOTTestPayload);
}),
reader.onClose(() => {
callback(createEOTPayload(true));
traceVerbose(`Test Result named pipe ${pipeName} closed`);
dispose();
// reader is still hitting on close, I don't think we want this to happen?

// connectionCount = server.getConnectionCount();
// connectionCount[0] -= 1;
// server.setCurrentConnectionCount(connectionCount);
// if (connectionCount[0] === 0) {
// callback(createEOTPayload(true));
// traceVerbose(`Test Result named pipe ${pipeName} closed? idk how many tuimes tho`);
console.log('reader.onClose');
// dispose();
// } else {
// traceVerbose('Test Result NOT closed, there are still connections');
// }
}),
);
});
server.onClosed().then(() => {
traceVerbose(`Test Result named pipe ${pipeName} closed`);
console.log('server on close from utils');
dispose();
});

return { name: pipeName, dispose };
}

Expand All @@ -236,11 +255,10 @@ export async function startDiscoveryNamedPipe(
cancellationToken?: CancellationToken,
): Promise<{ name: string } & Disposable> {
const pipeName: string = generateRandomPipeName('python-test-discovery');
const server = await createNamedPipeServer(pipeName);
let dispose: () => void = () => {
/* noop */
};
server.onConnected().then(([reader, _writer]) => {
await createNamedPipeServer(pipeName, ([reader, _writer]) => {
traceVerbose(`Test Discovery named pipe ${pipeName} connected`);
let disposables: (Disposable | undefined)[] = [reader];
dispose = () => {
Expand Down
42 changes: 29 additions & 13 deletions src/client/testing/testController/pytest/pytestExecutionAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,32 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
): Promise<ExecutionTestPayload> {
// deferredTillEOT is resolved when all data sent over payload is received
const deferredTillEOT: Deferred<void> = utils.createTestingDeferred();

const { name, dispose } = await utils.startRunResultNamedPipe((data: ExecutionTestPayload | EOTTestPayload) => {
if ('eot' in data && data.eot === true) {
deferredTillEOT.resolve();
return;
}
if (runInstance && !runInstance.token.isCancellationRequested) {
this.resultResolver?.resolveExecution(data, runInstance);
} else {
traceError(`No run instance found, cannot resolve execution, for workspace ${uri.fsPath}.`);
}
}, runInstance?.token);
const deferredTillAllServerClose: Deferred<void> = utils.createTestingDeferred();

const { name, dispose } = await utils.startRunResultNamedPipe(
(data: ExecutionTestPayload | EOTTestPayload) => {
if ('eot' in data && data.eot === true) {
// this resolves deferredTillEOT after single connection closed
// is there even a way to confirm all data has been sent from all connections?
// this would require tracking EOT # and comparing to connectionCount which seems too hard / unneeded
deferredTillEOT.resolve();
console.log('eot reached');
} else if (runInstance && !runInstance.token.isCancellationRequested) {
this.resultResolver?.resolveExecution(data, runInstance);
console.log('resolve data', data);
} else {
traceError(`No run instance found, cannot resolve execution, for workspace ${uri.fsPath}.`);
}
},
deferredTillAllServerClose,
runInstance?.token,
);
runInstance?.token.onCancellationRequested(() => {
traceInfo(`Test run cancelled, resolving 'till EOT' deferred for ${uri.fsPath}.`);
// if canceled, stop listening for results
deferredTillEOT.resolve();
// if canceled, close the server, resolves the deferredTillAllServerClose
dispose();
});

try {
Expand All @@ -67,8 +78,12 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
deferredTillEOT,
);
} finally {
// wait for data and all connections to close
await deferredTillEOT.promise;
dispose();
await deferredTillAllServerClose.promise;
console.log("past 'till EOT' promise, going for disposal");
// connectionCount;

traceVerbose('deferredTill EOT resolved');
}

Expand Down Expand Up @@ -219,6 +234,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter {
// deferredTillEOT is resolved when all data sent on stdout and stderr is received, close event is only called when this occurs
// due to the sync reading of the output.
deferredTillExecClose?.resolve();
console.log('closing deferredTillExecClose');
});
await deferredTillExecClose?.promise;
}
Expand Down

0 comments on commit 9da43cf

Please sign in to comment.