diff --git a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java index 59ed3e8..ca863b9 100644 --- a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java +++ b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java @@ -49,7 +49,7 @@ public static void main(String[] argv) { .endpoint(BATCH_URI) .credential(new AzureNamedKeyCredential(BATCH_ACCOUNT, BATCH_ACCESS_KEY)) .buildClient(); - + BlobContainerClient containerClient = createBlobContainerIfNotExists( STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME); @@ -65,65 +65,64 @@ public static void main(String[] argv) { submitJob(client, containerClient, sharedPool.getId(), jobId, TASK_COUNT); waitForTasksToComplete(client, jobId, Duration.ofMinutes(5)); - System.out.println("\nTask Results"); - System.out.println("------------------------------------------------------"); + System.out.println("\nTask Results"); + System.out.println("------------------------------------------------------"); - PagedIterable tasks = client.listTasks(jobId); - for (BatchTask task : tasks) { - BatchTaskExecutionInfo execution = task.getExecutionInfo(); + PagedIterable tasks = client.listTasks(jobId); + for (BatchTask task : tasks) { + BatchTaskExecutionInfo execution = task.getExecutionInfo(); - if (execution.getFailureInfo() != null) { - System.out.println("Task " + task.getId() + " failed: " + execution.getFailureInfo().getMessage()); - } + if (execution.getFailureInfo() != null) { + System.out.println("Task " + task.getId() + " failed: " + execution.getFailureInfo().getMessage()); + } - String outputFileName = execution.getExitCode() == 0 ? "stdout.txt" : "stderr.txt"; - String fileContent = client.getTaskFile(jobId, task.getId(), outputFileName).toString(); + String outputFileName = execution.getExitCode() == 0 ? "stdout.txt" : "stderr.txt"; + String fileContent = client.getTaskFile(jobId, task.getId(), outputFileName).toString(); - System.out.println("\nTask " + task.getId() + " output (" + outputFileName + "):"); - System.out.println(fileContent); - } + System.out.println("\nTask " + task.getId() + " output (" + outputFileName + "):"); + System.out.println(fileContent); + } - System.out.println("------------------------------------------------------\n"); - // TODO: How do we replace BatchErrorException? - // } catch (BatchErrorException err) { - // printBatchException(err); + System.out.println("------------------------------------------------------\n"); + // TODO: How do we replace BatchErrorException? + // } catch (BatchErrorException err) { + // printBatchException(err); } catch (Exception ex) { ex.printStackTrace(); } finally { - // Clean up resources - if (CLEANUP_JOB) { - try { - System.out.println("Deleting job " + jobId); - client.deleteJob(jobId); - } catch (HttpResponseException err) { - printBatchException(err); - } - } - if (CLEANUP_POOL) { - try { - System.out.println("Deleting pool " + poolId); - client.deletePool(poolId); - } catch (HttpResponseException err) { - printBatchException(err); - } - } - if (CLEANUP_STORAGE_CONTAINER) { - System.out.println("Deleting storage container " + containerClient.getBlobContainerName()); - containerClient.deleteIfExists(); - } + // Clean up resources + if (CLEANUP_JOB) { + try { + System.out.println("Deleting job " + jobId); + client.deleteJob(jobId); + } catch (HttpResponseException err) { + printBatchException(err); + } + } + if (CLEANUP_POOL) { + try { + System.out.println("Deleting pool " + poolId); + client.deletePool(poolId); + } catch (HttpResponseException err) { + printBatchException(err); + } + } + if (CLEANUP_STORAGE_CONTAINER) { + System.out.println("Deleting storage container " + containerClient.getBlobContainerName()); + containerClient.deleteIfExists(); + } } - System.out.println("\nFinished"); - System.exit(0); + System.out.println("\nFinished"); + System.exit(0); } /** * Create a pool if one doesn't already exist with the given ID * - * @param client The Batch client - * @param poolId The ID of the pool to create or look up - * - * @return A newly created or existing pool + * @param client The Batch client + * @param poolId The ID of the pool to create or look up + * @return A newly created or existing pool */ private static BatchPool createPoolIfNotExists(BatchClient client, String poolId) throws InterruptedException, TimeoutException { @@ -158,7 +157,7 @@ private static BatchPool createPoolIfNotExists(BatchClient client, String poolId if (nodeAgentSku == null || image == null) { throw new IllegalArgumentException( - String.format("Unable to find a verified image with publisher '%s' and offer '%s'", osPublisher, osOffer)); + String.format("Unable to find a verified image with publisher '%s' and offer '%s'", osPublisher, osOffer)); } client.createPool(new BatchPoolCreateParameters(poolId, vmSize) @@ -244,112 +243,111 @@ private static BlobContainerClient createBlobContainerIfNotExists(String storage return blobClient.getBlobContainerClient(containerName); } - /** - * Upload a file to a blob container and return an SAS key - * - * @param containerClient The blob container client to use - * @param source The local file to upload - * - * @return An SAS key for the uploaded file - */ - private static String uploadFileToStorage(BlobContainerClient containerClient, File source) throws IOException { - BlockBlobClient blobClient = containerClient.getBlobClient(source.getName()).getBlockBlobClient(); - blobClient.upload(Files.newInputStream(source.toPath()), source.length()); - - // Create SAS with expiry time of 1 day - String sas = blobClient.generateSas(new BlobServiceSasSignatureValues( - OffsetDateTime.now().plusDays(1), - new BlobSasPermission().setReadPermission(true) - )); - - return blobClient.getBlobUrl() + "?" + sas; - } - - /** - * Create a job and add some tasks - * - * @param client The Batch client - * @param containerClient A blob container to upload resource files - * @param poolId The ID of the pool to submit a job - * @param jobId A unique ID for the new job - * @param taskCount How many tasks to add - */ - private static void submitJob(BatchClient client, BlobContainerClient containerClient, String poolId, - String jobId, int taskCount) throws IOException, InterruptedException { - System.out.println("Submitting job " + jobId + " with " + taskCount + " tasks"); - - // Create job - BatchPoolInfo poolInfo = new BatchPoolInfo(); - poolInfo.setPoolId(poolId); - client.createJob(new BatchJobCreateParameters(jobId, poolInfo)); - - // Upload a resource file and make it available in a "resources" subdirectory on nodes - String fileName = "test.txt"; - String localPath = "./" + fileName; - String remotePath = "resources/" + fileName; - String signedUrl = uploadFileToStorage(containerClient, new File(localPath)); - List files = new ArrayList<>(); - files.add(new ResourceFile() - .setHttpUrl(signedUrl) - .setFilePath(remotePath)); - - // Create tasks - List tasks = new ArrayList<>(); - for (int i = 0; i < taskCount; i++) { - tasks.add(new BatchTaskCreateParameters("mytask" + i, "cat " + remotePath) - .setResourceFiles(files)); - } - - // Add the tasks to the job - client.createTasks(jobId, tasks); - } - - /** - * Wait for all tasks in a given job to be completed, or throw an exception on timeout - * - * @param client The Batch client - * @param jobId The ID of the job to poll for completion. - * @param timeout How long to wait for the job to complete before giving up - */ - private static void waitForTasksToComplete(BatchClient client, String jobId, Duration timeout) - throws InterruptedException, TimeoutException { - long startTime = System.currentTimeMillis(); - long elapsedTime = 0L; - - System.out.print("Waiting for tasks to complete (Timeout: " + timeout.getSeconds() / 60 + "m)"); - - while (elapsedTime < timeout.toMillis()) { - PagedIterable taskCollection = client.listTasks(jobId, - new ListBatchTasksOptions().setSelect(Arrays.asList("id", "state"))); - - boolean allComplete = true; - for (BatchTask task : taskCollection) { - if (task.getState() != BatchTaskState.COMPLETED) { - allComplete = false; - break; - } - } - - if (allComplete) { - System.out.println("\nAll tasks completed"); - // All tasks completed - return; - } - - System.out.print("."); - - TimeUnit.SECONDS.sleep(10); - elapsedTime = (new Date()).getTime() - startTime; - } - - System.out.println(); - - throw new TimeoutException("Task did not complete within the specified timeout"); - } - - private static void printBatchException(HttpResponseException err) { - // TODO: How do we get error details? - System.out.printf("HTTP Response error %s%n", err.toString()); + /** + * Upload a file to a blob container and return an SAS key + * + * @param containerClient The blob container client to use + * @param source The local file to upload + * @return An SAS key for the uploaded file + */ + private static String uploadFileToStorage(BlobContainerClient containerClient, File source) throws IOException { + BlockBlobClient blobClient = containerClient.getBlobClient(source.getName()).getBlockBlobClient(); + blobClient.upload(Files.newInputStream(source.toPath()), source.length()); + + // Create SAS with expiry time of 1 day + String sas = blobClient.generateSas(new BlobServiceSasSignatureValues( + OffsetDateTime.now().plusDays(1), + new BlobSasPermission().setReadPermission(true) + )); + + return blobClient.getBlobUrl() + "?" + sas; + } + + /** + * Create a job and add some tasks + * + * @param client The Batch client + * @param containerClient A blob container to upload resource files + * @param poolId The ID of the pool to submit a job + * @param jobId A unique ID for the new job + * @param taskCount How many tasks to add + */ + private static void submitJob(BatchClient client, BlobContainerClient containerClient, String poolId, + String jobId, int taskCount) throws IOException, InterruptedException { + System.out.println("Submitting job " + jobId + " with " + taskCount + " tasks"); + + // Create job + BatchPoolInfo poolInfo = new BatchPoolInfo(); + poolInfo.setPoolId(poolId); + client.createJob(new BatchJobCreateParameters(jobId, poolInfo)); + + // Upload a resource file and make it available in a "resources" subdirectory on nodes + String fileName = "test.txt"; + String localPath = "./" + fileName; + String remotePath = "resources/" + fileName; + String signedUrl = uploadFileToStorage(containerClient, new File(localPath)); + List files = new ArrayList<>(); + files.add(new ResourceFile() + .setHttpUrl(signedUrl) + .setFilePath(remotePath)); + + // Create tasks + List tasks = new ArrayList<>(); + for (int i = 0; i < taskCount; i++) { + tasks.add(new BatchTaskCreateParameters("mytask" + i, "cat " + remotePath) + .setResourceFiles(files)); + } + + // Add the tasks to the job + client.createTasks(jobId, tasks); + } + + /** + * Wait for all tasks in a given job to be completed, or throw an exception on timeout + * + * @param client The Batch client + * @param jobId The ID of the job to poll for completion. + * @param timeout How long to wait for the job to complete before giving up + */ + private static void waitForTasksToComplete(BatchClient client, String jobId, Duration timeout) + throws InterruptedException, TimeoutException { + long startTime = System.currentTimeMillis(); + long elapsedTime = 0L; + + System.out.print("Waiting for tasks to complete (Timeout: " + timeout.getSeconds() / 60 + "m)"); + + while (elapsedTime < timeout.toMillis()) { + PagedIterable taskCollection = client.listTasks(jobId, + new ListBatchTasksOptions().setSelect(Arrays.asList("id", "state"))); + + boolean allComplete = true; + for (BatchTask task : taskCollection) { + if (task.getState() != BatchTaskState.COMPLETED) { + allComplete = false; + break; + } + } + + if (allComplete) { + System.out.println("\nAll tasks completed"); + // All tasks completed + return; + } + + System.out.print("."); + + TimeUnit.SECONDS.sleep(10); + elapsedTime = (new Date()).getTime() - startTime; + } + + System.out.println(); + + throw new TimeoutException("Task did not complete within the specified timeout"); + } + + private static void printBatchException(HttpResponseException err) { + // TODO: How do we get error details? + System.out.printf("HTTP Response error %s%n", err.toString()); // if (err.body() != null) { // System.out.printf("BatchError code = %s, message = %s%n", err.body().code(), // err.body().message().value()); @@ -359,6 +357,6 @@ private static void printBatchException(HttpResponseException err) { // } // } // } - } + } }