From 7a25b88dfee4f1350fcf4d0d58fe07a1d9a18f22 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 28 Jun 2024 12:47:45 +0100 Subject: [PATCH] Initial commit --- .dockerignore | 2 + .github/FUNDING.yml | 1 + .github/dependabot.yml | 21 ++ .github/workflows/api-breakage.yml | 22 ++ .github/workflows/ci.yml | 39 +++ .github/workflows/nightly.yml | 21 ++ .github/workflows/validate.yml | 20 ++ .gitignore | 12 + .swiftformat | 26 ++ CODE_OF_CONDUCT.md | 54 ++++ CONTRIBUTING.md | 33 ++ Dockerfile | 18 ++ LICENSE | 201 ++++++++++++ Package.swift | 41 +++ Sources/HummingbirdJobs/Job.swift | 32 ++ Sources/HummingbirdJobs/JobContext.swift | 19 ++ Sources/HummingbirdJobs/JobDefinition.swift | 35 +++ Sources/HummingbirdJobs/JobIdentifier.swift | 42 +++ Sources/HummingbirdJobs/JobParameters.swift | 55 ++++ Sources/HummingbirdJobs/JobQueue.swift | 102 ++++++ Sources/HummingbirdJobs/JobQueueDriver.swift | 43 +++ Sources/HummingbirdJobs/JobQueueError.swift | 36 +++ Sources/HummingbirdJobs/JobQueueHandler.swift | 121 ++++++++ Sources/HummingbirdJobs/JobRegistry.swift | 122 ++++++++ Sources/HummingbirdJobs/MemoryJobQueue.swift | 136 ++++++++ Sources/HummingbirdJobs/QueuedJob.swift | 30 ++ .../DecodableWithUserInfoConfiguration.swift | 51 +++ .../HummingbirdJobsTests.swift | 292 ++++++++++++++++++ scripts/validate.sh | 118 +++++++ 29 files changed, 1745 insertions(+) create mode 100644 .dockerignore create mode 100644 .github/FUNDING.yml create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/api-breakage.yml create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/nightly.yml create mode 100644 .github/workflows/validate.yml create mode 100644 .gitignore create mode 100644 .swiftformat create mode 100644 CODE_OF_CONDUCT.md create mode 100644 CONTRIBUTING.md create mode 100644 Dockerfile create mode 100644 LICENSE create mode 100644 Package.swift create mode 100644 Sources/HummingbirdJobs/Job.swift create mode 100644 Sources/HummingbirdJobs/JobContext.swift create mode 100644 Sources/HummingbirdJobs/JobDefinition.swift create mode 100644 Sources/HummingbirdJobs/JobIdentifier.swift create mode 100644 Sources/HummingbirdJobs/JobParameters.swift create mode 100644 Sources/HummingbirdJobs/JobQueue.swift create mode 100644 Sources/HummingbirdJobs/JobQueueDriver.swift create mode 100644 Sources/HummingbirdJobs/JobQueueError.swift create mode 100644 Sources/HummingbirdJobs/JobQueueHandler.swift create mode 100644 Sources/HummingbirdJobs/JobRegistry.swift create mode 100644 Sources/HummingbirdJobs/MemoryJobQueue.swift create mode 100644 Sources/HummingbirdJobs/QueuedJob.swift create mode 100644 Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift create mode 100644 Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift create mode 100755 scripts/validate.sh diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..2fb3343 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +.build +.git \ No newline at end of file diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..7fc910b --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +github: adam-fowler diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..a54e14d --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,21 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" + groups: + dependencies: + patterns: + - "*" + - package-ecosystem: "swift" + directory: "/" + schedule: + interval: "daily" + open-pull-requests-limit: 5 + allow: + - dependency-type: all + groups: + all-dependencies: + patterns: + - "*" diff --git a/.github/workflows/api-breakage.yml b/.github/workflows/api-breakage.yml new file mode 100644 index 0000000..9919108 --- /dev/null +++ b/.github/workflows/api-breakage.yml @@ -0,0 +1,22 @@ +name: API breaking changes + +on: + pull_request: + +jobs: + linux: + runs-on: ubuntu-latest + timeout-minutes: 15 + container: + image: swift:5.10 + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + # https://github.com/actions/checkout/issues/766 + - name: Mark the workspace as safe + run: git config --global --add safe.directory ${GITHUB_WORKSPACE} + - name: API breaking changes + run: | + swift package diagnose-api-breaking-changes origin/${GITHUB_BASE_REF} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..51adaa2 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,39 @@ +name: CI + +on: + push: + branches: + - main + paths: + - '**.swift' + - '**.yml' + pull_request: + workflow_dispatch: + +jobs: + linux: + runs-on: ubuntu-latest + timeout-minutes: 15 + strategy: + matrix: + image: ["swift:5.9", "swift:5.10", "swiftlang/swift:nightly-6.0-jammy"] + + container: + image: ${{ matrix.image }} + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Test + run: | + swift test --enable-code-coverage + - name: Convert coverage files + run: | + llvm-cov export -format="lcov" \ + .build/debug/hummingbird-jobsPackageTests.xctest \ + -ignore-filename-regex="\/Tests\/" \ + -ignore-filename-regex="\/Benchmarks\/" \ + -instr-profile .build/debug/codecov/default.profdata > info.lcov + - name: Upload to codecov.io + uses: codecov/codecov-action@v4 + with: + file: info.lcov diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml new file mode 100644 index 0000000..1009911 --- /dev/null +++ b/.github/workflows/nightly.yml @@ -0,0 +1,21 @@ +name: Swift nightly build + +on: + workflow_dispatch: + +jobs: + linux: + runs-on: ubuntu-latest + timeout-minutes: 15 + strategy: + matrix: + image: ['nightly-focal', 'nightly-jammy', 'nightly-amazonlinux2'] + + container: + image: swiftlang/swift:${{ matrix.image }} + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Test + run: | + swift test diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml new file mode 100644 index 0000000..172da88 --- /dev/null +++ b/.github/workflows/validate.yml @@ -0,0 +1,20 @@ +name: Validity Check + +on: + pull_request: + +jobs: + validate: + runs-on: macOS-latest + timeout-minutes: 15 + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 1 + - name: Install Dependencies + run: | + brew install mint + mint install NickLockwood/SwiftFormat@0.51.15 --no-link + - name: run script + run: ./scripts/validate.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45ca84c --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +.DS_Store +.build/ +.swiftpm/ +.vscode/ +.devcontainer/ +/Packages +/*.xcodeproj +xcuserdata/ +Package.resolved +/public +/docs +.benchmarkBaselines \ No newline at end of file diff --git a/.swiftformat b/.swiftformat new file mode 100644 index 0000000..beb1d05 --- /dev/null +++ b/.swiftformat @@ -0,0 +1,26 @@ +# Minimum swiftformat version +--minversion 0.51.0 + +# Swift version +--swiftversion 5.9 + +# file options +--exclude .build + +# rules +--disable redundantReturn, extensionAccessControl, typeSugar, conditionalAssignment + +# format options +--ifdef no-indent +--nospaceoperators ...,..< +--patternlet inline +--self insert +--stripunusedargs unnamed-only + +#--maxwidth 150 +--wraparguments before-first +--wrapparameters before-first +--wrapcollections before-first + +#file header +# --header "//===----------------------------------------------------------------------===//\n//\n// This source file is part of the Hummingbird server framework project\n//\n// Copyright (c) {created.year}-{year} the Hummingbird authors\n// Licensed under Apache License v2.0\n//\n// See LICENSE.txt for license information\n// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors\n//\n// SPDX-License-Identifier: Apache-2.0\n//\n//===----------------------------------------------------------------------===//" diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..8375921 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,54 @@ +# Code of Conduct + +All developers should feel welcome and encouraged to contribute to Hummingbird. Because of this we have adopted the code of conduct defined by [contributor-covenant.org](https://www.contributor-covenant.org). This document is used across many open source +communities, and we think it articulates our values well. The full text is copied below: + +## Contributor Code of Conduct v1.3 + +As contributors and maintainers of this project, and in the interest of +fostering an open and welcoming community, we pledge to respect all people who +contribute through reporting issues, posting feature requests, updating +documentation, submitting pull requests or patches, and other activities. + +We are committed to making participation in this project a harassment-free +experience for everyone, regardless of level of experience, gender, gender +identity and expression, sexual orientation, disability, personal appearance, +body size, race, ethnicity, age, religion, or nationality. + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery +* Personal attacks +* Trolling or insulting/derogatory comments +* Public or private harassment +* Publishing other's private information, such as physical or electronic + addresses, without explicit permission +* Other unethical or unprofessional conduct + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +By adopting this Code of Conduct, project maintainers commit themselves to +fairly and consistently applying these principles to every aspect of managing +this project. Project maintainers who do not follow or enforce the Code of +Conduct may be permanently removed from the project team. + +This Code of Conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting a project maintainer at [INSERT EMAIL ADDRESS]. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. Maintainers are +obligated to maintain confidentiality with regard to the reporter of an +incident. + + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], +version 1.3.0, available at https://www.contributor-covenant.org/version/1/3/0/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..bd21b4f --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,33 @@ +# Contributing + +## Legal +By submitting a pull request, you represent that you have the right to license your contribution to the community, and agree by submitting the patch +that your contributions are licensed under the Apache 2.0 license (see [LICENSE](LICENSE)). + +## Contributor Conduct +All contributors are expected to adhere to the project's [Code of Conduct](CODE_OF_CONDUCT.md). + +## Submitting a bug or issue +Please ensure to include the following in your bug report +- A consise description of the issue, what happened and what you expected. +- Simple reproduction steps +- Version of the library you are using +- Contextual information (Swift version, OS etc) + +## Submitting a Pull Request + +Please ensure to include the following in your Pull Request +- A description of what you are trying to do. What the PR provides to the library, additional functionality, fixing a bug etc +- A description of the code changes +- Documentation on how these changes are being tested +- Additional tests to show your code working and to ensure future changes don't break your code. + +Remember the requirements for Hummingbird and HummingbirdCore (No Foundation and no new dependencies). If you are submitting a large change to a module (or bringing in a new dependency) please consider making these changes in a separate repository. The idea is that Hummingbird/HummingbirdCore are kept as slimline as possible. These concerns can be discussed in a Github Issue. + +Please keep your PRs to a minimal number of changes. If a PR is large try to split it up into smaller PRs. Don't move code around unnecessarily it makes comparing old with new very hard. + +The main development branch of the repository is `main`. + +### Formatting + +We use Nick Lockwood's SwiftFormat for formatting code. PRs will not be accepted if they haven't be formatted. The current version of SwiftFormat we are using is v0.51.15. \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2c1d78c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +# ================================ +# Build image +# ================================ +FROM swift:5.10 as build + +WORKDIR /build + +# First just resolve dependencies. +# This creates a cached layer that can be reused +# as long as your Package.swift/Package.resolved +# files do not change. +COPY ./Package.* ./ +RUN swift package resolve + +# Copy entire repo into container +COPY . . + +RUN swift test --sanitize=thread diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..17644f5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2021 Adam Fowler + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Package.swift b/Package.swift new file mode 100644 index 0000000..eff7fbd --- /dev/null +++ b/Package.swift @@ -0,0 +1,41 @@ +// swift-tools-version:5.9 +// The swift-tools-version declares the minimum version of Swift required to build this package. + +import PackageDescription + +let swiftSettings: [SwiftSetting] = [.enableExperimentalFeature("StrictConcurrency=complete")] + +let package = Package( + name: "hummingbird", + platforms: [.macOS(.v14), .iOS(.v17), .tvOS(.v17)], + products: [ + .library(name: "HummingbirdJobs", targets: ["HummingbirdJobs"]), + ], + dependencies: [ + .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"), + .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0"), + ], + targets: [ + .target( + name: "HummingbirdJobs", + dependencies: [ + .product(name: "Collections", package: "swift-collections"), + .product(name: "Logging", package: "swift-log"), + .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOFoundationCompat", package: "swift-nio"), + .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), + ], + swiftSettings: swiftSettings + ), + // test targets + .testTarget(name: "HummingbirdJobsTests", dependencies: [ + .byName(name: "HummingbirdJobs"), + // .byName(name: "HummingbirdTesting"), + .product(name: "Atomics", package: "swift-atomics"), + ]), + ] +) diff --git a/Sources/HummingbirdJobs/Job.swift b/Sources/HummingbirdJobs/Job.swift new file mode 100644 index 0000000..addf4f4 --- /dev/null +++ b/Sources/HummingbirdJobs/Job.swift @@ -0,0 +1,32 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Protocol for a Job +protocol Job: Sendable { + /// Parameters job requries + associatedtype Parameters: Codable & Sendable + /// Job Type identifier + var id: JobIdentifier { get } + /// Maximum number of times a job will be retried before being classed as failed + var maxRetryCount: Int { get } + /// Function to execute the job + func execute(context: JobContext) async throws +} + +extension Job { + /// name of job type + public var name: String { + id.name + } +} diff --git a/Sources/HummingbirdJobs/JobContext.swift b/Sources/HummingbirdJobs/JobContext.swift new file mode 100644 index 0000000..35fc12c --- /dev/null +++ b/Sources/HummingbirdJobs/JobContext.swift @@ -0,0 +1,19 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging + +public struct JobContext { + public let logger: Logger +} diff --git a/Sources/HummingbirdJobs/JobDefinition.swift b/Sources/HummingbirdJobs/JobDefinition.swift new file mode 100644 index 0000000..3030777 --- /dev/null +++ b/Sources/HummingbirdJobs/JobDefinition.swift @@ -0,0 +1,35 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Job definition type +public struct JobDefinition: Sendable { + public let id: JobIdentifier + let maxRetryCount: Int + let _execute: @Sendable (Parameters, JobContext) async throws -> Void + + /// Initialize JobDefinition + /// - Parameters: + /// - id: Job identifier + /// - maxRetryCount: Maxiumum times this job will be retried if it fails + /// - execute: Closure that executes job + public init(id: JobIdentifier, maxRetryCount: Int = 0, execute: @escaping @Sendable (Parameters, JobContext) async throws -> Void) { + self.id = id + self.maxRetryCount = maxRetryCount + self._execute = execute + } + + func execute(_ parameters: Parameters, context: JobContext) async throws { + try await self._execute(parameters, context) + } +} diff --git a/Sources/HummingbirdJobs/JobIdentifier.swift b/Sources/HummingbirdJobs/JobIdentifier.swift new file mode 100644 index 0000000..315dba9 --- /dev/null +++ b/Sources/HummingbirdJobs/JobIdentifier.swift @@ -0,0 +1,42 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Identifier for a Job type +/// +/// The identifier includes the type of the parameters required by the job to ensure +/// the wrong parameters are not passed to this job +/// +/// Extend this type to include your own job identifiers +/// ``` +/// extension JobIdentifier { +/// static var myJob: Self { .init("my-job") } +/// } +/// ``` +public struct JobIdentifier: Sendable, Hashable, ExpressibleByStringLiteral { + let name: String + /// Initialize a JobIdentifier + /// + /// - Parameters: + /// - name: Unique name for identifier + /// - parameters: Parameter type associated with Job + public init(_ name: String, parameters: Parameters.Type = Parameters.self) { self.name = name } + + /// Initialize a JobIdentifier from a string literal + /// + /// This can only be used in a situation where the Parameter type is defined elsewhere + /// - Parameter string: + public init(stringLiteral string: String) { + self.name = string + } +} diff --git a/Sources/HummingbirdJobs/JobParameters.swift b/Sources/HummingbirdJobs/JobParameters.swift new file mode 100644 index 0000000..d6265e8 --- /dev/null +++ b/Sources/HummingbirdJobs/JobParameters.swift @@ -0,0 +1,55 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Defines job parameters and identifier +public protocol JobParameters: Codable, Sendable { + /// Job type identifier + static var jobID: String { get } +} + +extension JobQueue { + /// Push Job onto queue + /// - Parameters: + /// - parameters: parameters for the job + /// - Returns: Identifier of queued job + @discardableResult public func push(_ parameters: Parameters) async throws -> Queue.JobID { + return try await self.push(id: .init(Parameters.jobID), parameters: parameters) + } + + /// Register job type + /// - Parameters: + /// - parameters: Job parameter type + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob( + parameters: Parameters.Type = Parameters.self, + maxRetryCount: Int = 0, + execute: @escaping @Sendable ( + Parameters, + JobContext + ) async throws -> Void + ) { + self.registerJob(id: .init(Parameters.jobID), maxRetryCount: maxRetryCount, execute: execute) + } +} + +extension JobDefinition where Parameters: JobParameters { + /// Initialize JobDefinition + /// - Parameters: + /// - maxRetryCount: Maxiumum times this job will be retried if it fails + /// - execute: Closure that executes job + public init(maxRetryCount: Int = 0, execute: @escaping @Sendable (Parameters, JobContext) async throws -> Void) { + self.init(id: .init(Parameters.jobID), maxRetryCount: maxRetryCount, execute: execute) + } +} diff --git a/Sources/HummingbirdJobs/JobQueue.swift b/Sources/HummingbirdJobs/JobQueue.swift new file mode 100644 index 0000000..b3abf18 --- /dev/null +++ b/Sources/HummingbirdJobs/JobQueue.swift @@ -0,0 +1,102 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Logging +import NIOCore +import NIOFoundationCompat +import ServiceLifecycle + +/// Job queue +/// +/// Wrapper type to bring together a job queue implementation and a job queue +/// handler. Before you can push jobs onto a queue you should register it +/// with the queue via either ``registerJob(id:maxRetryCount:execute:)`` or +/// ``registerJob(_:)``. +public struct JobQueue: Service { + /// underlying driver for queue + public let queue: Queue + let handler: JobQueueHandler + let allocator: ByteBufferAllocator + + public init(_ queue: Queue, numWorkers: Int = 1, logger: Logger) { + self.queue = queue + self.handler = .init(queue: queue, numWorkers: numWorkers, logger: logger) + self.allocator = .init() + } + + /// Push Job onto queue + /// - Parameters: + /// - id: Job identifier + /// - parameters: parameters for the job + /// - Returns: Identifier of queued job + @discardableResult public func push(id: JobIdentifier, parameters: Parameters) async throws -> Queue.JobID { + let jobRequest = JobRequest(id: id, parameters: parameters) + let buffer = try JSONEncoder().encodeAsByteBuffer(jobRequest, allocator: self.allocator) + let id = try await self.queue.push(buffer) + self.handler.logger.debug("Pushed Job", metadata: ["_job_id": .stringConvertible(id), "_job_type": .string(jobRequest.id.name)]) + return id + } + + /// Register job type + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob( + id: JobIdentifier, + maxRetryCount: Int = 0, + execute: @escaping @Sendable ( + Parameters, + JobContext + ) async throws -> Void + ) { + self.handler.logger.info("Registered Job", metadata: ["_job_type": .string(id.name)]) + let job = JobDefinition(id: id, maxRetryCount: maxRetryCount, execute: execute) + self.registerJob(job) + } + + /// Register job type + /// - Parameters: + /// - job: Job definition + public func registerJob(_ job: JobDefinition) { + self.handler.registerJob(job) + } + + /// Run queue handler + public func run() async throws { + try await self.handler.run() + } +} + +extension JobQueue: CustomStringConvertible { + public var description: String { "JobQueue<\(String(describing: Queue.self))>" } +} + +/// Type used internally to encode a request +struct JobRequest: Encodable, Sendable { + let id: JobIdentifier + let parameters: Parameters + + public init(id: JobIdentifier, parameters: Parameters) { + self.id = id + self.parameters = parameters + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: _JobCodingKey.self) + let childEncoder = container.superEncoder(forKey: .init(stringValue: self.id.name, intValue: nil)) + try self.parameters.encode(to: childEncoder) + } +} diff --git a/Sources/HummingbirdJobs/JobQueueDriver.swift b/Sources/HummingbirdJobs/JobQueueDriver.swift new file mode 100644 index 0000000..9c86578 --- /dev/null +++ b/Sources/HummingbirdJobs/JobQueueDriver.swift @@ -0,0 +1,43 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Logging +import NIOCore + +/// Job queue protocol. +/// +/// Defines how to push and pop job data off a queue +public protocol JobQueueDriver: AsyncSequence, Sendable where Element == QueuedJob { + associatedtype JobID: CustomStringConvertible & Sendable + + /// Called when JobQueueHandler is initialised with this queue + func onInit() async throws + /// Push Job onto queue + /// - Returns: Identifier of queued job + func push(_ buffer: ByteBuffer) async throws -> JobID + /// This is called to say job has finished processing and it can be deleted + func finished(jobId: JobID) async throws + /// This is called to say job has failed to run and should be put aside + func failed(jobId: JobID, error: any Error) async throws + /// stop serving jobs + func stop() async + /// shutdown queue + func shutdownGracefully() async +} + +extension JobQueueDriver { + // default version of onInit doing nothing + public func onInit() async throws {} +} diff --git a/Sources/HummingbirdJobs/JobQueueError.swift b/Sources/HummingbirdJobs/JobQueueError.swift new file mode 100644 index 0000000..3105e96 --- /dev/null +++ b/Sources/HummingbirdJobs/JobQueueError.swift @@ -0,0 +1,36 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// Job Queue Error type +public struct JobQueueError: Error, Equatable { + /// failed to decode job. Possibly because it hasn't been registered or data that was expected + /// is not available + public static var decodeJobFailed: Self { .init(.decodeJobFailed) } + /// failed to decode job as the job id is not recognised + public static var unrecognisedJobId: Self { .init(.unrecognisedJobId) } + /// failed to get job from queue + public static var dequeueError: Self { .init(.dequeueError) } + + private enum QueueError { + case decodeJobFailed + case unrecognisedJobId + case dequeueError + } + + private let error: QueueError + + private init(_ error: QueueError) { + self.error = error + } +} diff --git a/Sources/HummingbirdJobs/JobQueueHandler.swift b/Sources/HummingbirdJobs/JobQueueHandler.swift new file mode 100644 index 0000000..30d8e52 --- /dev/null +++ b/Sources/HummingbirdJobs/JobQueueHandler.swift @@ -0,0 +1,121 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +import ServiceLifecycle + +/// Object handling a single job queue +final class JobQueueHandler: Service { + init(queue: Queue, numWorkers: Int, logger: Logger) { + self.queue = queue + self.numWorkers = numWorkers + self.logger = logger + self.jobRegistry = .init() + } + + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + func registerJob(_ job: JobDefinition) { + self.jobRegistry.registerJob(job: job) + } + + func run() async throws { + try await self.queue.onInit() + + try await withGracefulShutdownHandler { + try await withThrowingTaskGroup(of: Void.self) { group in + var iterator = self.queue.makeAsyncIterator() + for _ in 0..) async throws { + var logger = logger + logger[metadataKey: "_job_id"] = .stringConvertible(queuedJob.id) + let job: any Job + do { + job = try self.jobRegistry.decode(queuedJob.jobBuffer) + } catch let error as JobQueueError where error == .unrecognisedJobId { + logger.debug("Failed to find Job with ID while decoding") + try await self.queue.failed(jobId: queuedJob.id, error: error) + return + } catch { + logger.debug("Job failed to decode") + try await self.queue.failed(jobId: queuedJob.id, error: JobQueueError.decodeJobFailed) + return + } + logger[metadataKey: "_job_type"] = .string(job.name) + + var count = job.maxRetryCount + logger.debug("Starting Job") + + do { + while true { + do { + try await job.execute(context: .init(logger: logger)) + break + } catch let error as CancellationError { + logger.debug("Job cancelled") + // Job failed is called but due to the fact the task is cancelled, depending on the + // job queue driver, the process of failing the job might not occur because itself + // might get cancelled + try await self.queue.failed(jobId: queuedJob.id, error: error) + return + } catch { + if count == 0 { + logger.debug("Job failed") + try await self.queue.failed(jobId: queuedJob.id, error: error) + return + } + count -= 1 + logger.debug("Retrying Job") + } + } + logger.debug("Finished Job") + try await self.queue.finished(jobId: queuedJob.id) + } catch { + logger.debug("Failed to set job status") + } + } + + private let jobRegistry: JobRegistry + private let queue: Queue + private let numWorkers: Int + let logger: Logger +} + +extension JobQueueHandler: CustomStringConvertible { + public var description: String { "JobQueueHandler<\(String(describing: Queue.self))>" } +} diff --git a/Sources/HummingbirdJobs/JobRegistry.swift b/Sources/HummingbirdJobs/JobRegistry.swift new file mode 100644 index 0000000..fd5b96f --- /dev/null +++ b/Sources/HummingbirdJobs/JobRegistry.swift @@ -0,0 +1,122 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import NIOConcurrencyHelpers +import NIOCore + +/// Registry for job types +struct JobRegistry: Sendable { + /// Register job + /// - Parameters: + /// - id: Job Identifier + /// - maxRetryCount: Maximum number of times job is retried before being flagged as failed + /// - execute: Job code + public func registerJob( + job: JobDefinition + ) { + let builder: @Sendable (Decoder) throws -> any Job = { decoder in + let parameters = try Parameters(from: decoder) + return try JobInstance(job: job, parameters: parameters) + } + self.builderTypeMap.withLockedValue { + precondition($0[job.id.name] == nil, "There is a job already registered under id \"\(job.id.name)\"") + $0[job.id.name] = builder + } + } + + func decode(_ buffer: ByteBuffer) throws -> any Job { + return try JSONDecoder().decode(AnyCodableJob.self, from: buffer, userInfoConfiguration: self).job + } + + func decode(from decoder: Decoder) throws -> any Job { + let container = try decoder.container(keyedBy: _JobCodingKey.self) + let key = container.allKeys.first! + let childDecoder = try container.superDecoder(forKey: key) + let jobDefinitionBuilder = try self.builderTypeMap.withLockedValue { + guard let job = $0[key.stringValue] else { throw JobQueueError.unrecognisedJobId } + return job + } + return try jobDefinitionBuilder(childDecoder) + } + + let builderTypeMap: NIOLockedValueBox < [String: @Sendable (Decoder) throws -> any Job]> = .init([:]) +} + +/// Internal job instance type +internal struct JobInstance: Job { + /// job definition + let job: JobDefinition + /// job parameters + let parameters: Parameters + + /// get i + var id: JobIdentifier { self.job.id } + var maxRetryCount: Int { self.job.maxRetryCount } + + func execute(context: JobContext) async throws { + try await self.job.execute(self.parameters, context: context) + } + + init(job: JobDefinition, parameters: Parameters) throws { + self.job = job + self.parameters = parameters + } +} + +/// Add codable support for decoding/encoding any Job +internal struct AnyCodableJob: DecodableWithUserInfoConfiguration, Sendable { + typealias DecodingConfiguration = JobRegistry + + init(from decoder: Decoder, configuration register: DecodingConfiguration) throws { + self.job = try register.decode(from: decoder) + } + + /// Job data + let job: any Job + + /// Initialize a queue job + init(_ job: any Job) { + self.job = job + } + + private enum CodingKeys: String, CodingKey { + case job + } +} + +internal struct _JobCodingKey: CodingKey { + public var stringValue: String + public var intValue: Int? + + public init?(stringValue: String) { + self.stringValue = stringValue + self.intValue = nil + } + + public init?(intValue: Int) { + self.stringValue = "\(intValue)" + self.intValue = intValue + } + + public init(stringValue: String, intValue: Int?) { + self.stringValue = stringValue + self.intValue = intValue + } + + internal init(index: Int) { + self.stringValue = "Index \(index)" + self.intValue = index + } +} diff --git a/Sources/HummingbirdJobs/MemoryJobQueue.swift b/Sources/HummingbirdJobs/MemoryJobQueue.swift new file mode 100644 index 0000000..113d363 --- /dev/null +++ b/Sources/HummingbirdJobs/MemoryJobQueue.swift @@ -0,0 +1,136 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Collections +import Foundation +import NIOCore + +/// In memory implementation of job queue driver. Stores job data in a circular buffer +public final class MemoryQueue: JobQueueDriver { + public typealias Element = QueuedJob + public typealias JobID = UUID + + /// queue of jobs + fileprivate let queue: Internal + private let onFailedJob: @Sendable (QueuedJob, any Error) -> Void + + /// Initialise In memory job queue + public init(onFailedJob: @escaping @Sendable (QueuedJob, any Error) -> Void = { _, _ in }) { + self.queue = .init() + self.onFailedJob = onFailedJob + } + + /// Stop queue serving more jobs + public func stop() async { + await self.queue.stop() + } + + /// Shutdown queue + public func shutdownGracefully() async { + await self.queue.shutdown() + } + + /// Push job onto queue + /// - Parameters: + /// - job: Job + /// - eventLoop: Eventloop to run process on (ignored in this case) + /// - Returns: Queued job + @discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID { + return try await self.queue.push(buffer) + } + + public func finished(jobId: JobID) async throws { + await self.queue.clearPendingJob(jobId: jobId) + } + + public func failed(jobId: JobID, error: any Error) async throws { + if let job = await self.queue.clearAndReturnPendingJob(jobId: jobId) { + self.onFailedJob(.init(id: jobId, jobBuffer: job), error) + } + } + + /// Internal actor managing the job queue + fileprivate actor Internal { + var queue: Deque> + var pendingJobs: [JobID: ByteBuffer] + var isStopped: Bool + + init() { + self.queue = .init() + self.isStopped = false + self.pendingJobs = .init() + } + + func push(_ jobBuffer: ByteBuffer) throws -> JobID { + let id = JobID() + self.queue.append(QueuedJob(id: id, jobBuffer: jobBuffer)) + return id + } + + func clearPendingJob(jobId: JobID) { + self.pendingJobs[jobId] = nil + } + + func clearAndReturnPendingJob(jobId: JobID) -> ByteBuffer? { + let instance = self.pendingJobs[jobId] + self.pendingJobs[jobId] = nil + return instance + } + + func next() async throws -> QueuedJob? { + while true { + if self.isStopped { + return nil + } + if let request = queue.popFirst() { + self.pendingJobs[request.id] = request.jobBuffer + return request + } + try await Task.sleep(for: .milliseconds(100)) + } + } + + func stop() { + self.isStopped = true + } + + func shutdown() { + assert(self.pendingJobs.count == 0) + self.isStopped = true + } + } +} + +extension MemoryQueue { + public struct AsyncIterator: AsyncIteratorProtocol { + fileprivate let queue: Internal + + public mutating func next() async throws -> Element? { + try await self.queue.next() + } + } + + public func makeAsyncIterator() -> AsyncIterator { + .init(queue: self.queue) + } +} + +extension JobQueueDriver where Self == MemoryQueue { + /// Return In memory driver for Job Queue + /// - Parameters: + /// - onFailedJob: Closure called when a job fails + public static var memory: MemoryQueue { + .init() + } +} diff --git a/Sources/HummingbirdJobs/QueuedJob.swift b/Sources/HummingbirdJobs/QueuedJob.swift new file mode 100644 index 0000000..507c382 --- /dev/null +++ b/Sources/HummingbirdJobs/QueuedJob.swift @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import NIOCore + +/// Queued job. Includes job data, plus the id for the job +public struct QueuedJob: Sendable { + /// Job instance id + public let id: JobID + /// Job data + public let jobBuffer: ByteBuffer + + /// Initialize a queue job + public init(id: JobID, jobBuffer: ByteBuffer) { + self.jobBuffer = jobBuffer + self.id = id + } +} diff --git a/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift b/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift new file mode 100644 index 0000000..9747de6 --- /dev/null +++ b/Sources/HummingbirdJobs/Utils/DecodableWithUserInfoConfiguration.swift @@ -0,0 +1,51 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import NIOCore +import NIOFoundationCompat + +/// Implementation of DecodableWithConfiguration which extracts the configuration from the userInfo array +/// +/// This is necessary as Linux Foundation does not have support for setting DecodableWithConfiguration +/// configuration from the JSONDecoder +protocol DecodableWithUserInfoConfiguration: Decodable, DecodableWithConfiguration {} + +/// Implement `init(from: Decoder)`` by extracting configuration from the userInfo dictionary. +extension DecodableWithUserInfoConfiguration { + init(from decoder: Decoder) throws { + guard let configuration = decoder.userInfo[.configuration] as? DecodingConfiguration else { + throw DecodingError.valueNotFound(DecodingConfiguration.self, .init(codingPath: decoder.codingPath, debugDescription: "Failed to find Decoding configuration")) + } + try self.init(from: decoder, configuration: configuration) + } +} + +extension CodingUserInfoKey { + /// Coding UserInfo key used to store DecodableWithUserInfoConfiguration configuration + static var configuration: Self { return .init(rawValue: "_configuration_")! } +} + +extension JSONDecoder { + /// Version of JSONDecoder that sets up configuration userInfo for the DecodableWithUserInfoConfiguration + /// protocol + func decode( + _ type: T.Type, + from buffer: ByteBuffer, + userInfoConfiguration: T.DecodingConfiguration + ) throws -> T where T: DecodableWithUserInfoConfiguration { + self.userInfo[.configuration] = userInfoConfiguration + return try self.decode(type, from: buffer) + } +} diff --git a/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift new file mode 100644 index 0000000..a35d1eb --- /dev/null +++ b/Tests/HummingbirdJobsTests/HummingbirdJobsTests.swift @@ -0,0 +1,292 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2024 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Atomics +import HummingbirdJobs +import Logging +import NIOConcurrencyHelpers +import ServiceLifecycle +import XCTest + +extension XCTestExpectation { + convenience init(description: String, expectedFulfillmentCount: Int) { + self.init(description: description) + self.expectedFulfillmentCount = expectedFulfillmentCount + } +} + +final class HummingbirdJobsTests: XCTestCase { + func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async { + #if (os(Linux) && swift(<5.9)) || swift(<5.8) + super.wait(for: expectations, timeout: timeout) + #else + await fulfillment(of: expectations, timeout: timeout) + #endif + } + + /// Helper function for test a server + /// + /// Creates test client, runs test function abd ensures everything is + /// shutdown correctly + public func testJobQueue( + _ jobQueue: Service, + _ test: () async throws -> Void + ) async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [jobQueue], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: Logger(label: "JobQueueService") + ) + ) + group.addTask { + try await serviceGroup.run() + } + try await test() + await serviceGroup.triggerGracefulShutdown() + } + } + + func testBasic() async throws { + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + let jobQueue = JobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + let job = JobDefinition(id: "testBasic") { (parameters: Int, context) in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() + } + jobQueue.registerJob(job) + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: job.id, parameters: 1) + try await jobQueue.push(id: job.id, parameters: 2) + try await jobQueue.push(id: job.id, parameters: 3) + try await jobQueue.push(id: job.id, parameters: 4) + try await jobQueue.push(id: job.id, parameters: 5) + try await jobQueue.push(id: job.id, parameters: 6) + try await jobQueue.push(id: job.id, parameters: 7) + try await jobQueue.push(id: job.id, parameters: 8) + try await jobQueue.push(id: job.id, parameters: 9) + try await jobQueue.push(id: job.id, parameters: 10) + + await self.wait(for: [expectation], timeout: 5) + } + } + + func testMultipleWorkers() async throws { + let jobIdentifer = JobIdentifier(#function) + let runningJobCounter = ManagedAtomic(0) + let maxRunningJobCounter = ManagedAtomic(0) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + + let jobQueue = JobQueue(.memory, numWorkers: 4, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(id: jobIdentifer) { parameters, context in + let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) + if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { + maxRunningJobCounter.store(runningJobs, ordering: .relaxed) + } + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + context.logger.info("Parameters=\(parameters)") + expectation.fulfill() + runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer, parameters: 1) + try await jobQueue.push(id: jobIdentifer, parameters: 2) + try await jobQueue.push(id: jobIdentifer, parameters: 3) + try await jobQueue.push(id: jobIdentifer, parameters: 4) + try await jobQueue.push(id: jobIdentifer, parameters: 5) + try await jobQueue.push(id: jobIdentifer, parameters: 6) + try await jobQueue.push(id: jobIdentifer, parameters: 7) + try await jobQueue.push(id: jobIdentifer, parameters: 8) + try await jobQueue.push(id: jobIdentifer, parameters: 9) + try await jobQueue.push(id: jobIdentifer, parameters: 10) + + await self.wait(for: [expectation], timeout: 5) + + XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1) + XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4) + } + } + + func testErrorRetryCount() async throws { + let jobIdentifer = JobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) + let failedJobCount = ManagedAtomic(0) + struct FailedError: Error {} + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .trace + let jobQueue = JobQueue( + MemoryQueue { _, _ in failedJobCount.wrappingIncrement(by: 1, ordering: .relaxed) }, + logger: logger + ) + jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in + expectation.fulfill() + throw FailedError() + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer, parameters: 0) + + await self.wait(for: [expectation], timeout: 5) + } + XCTAssertEqual(failedJobCount.load(ordering: .relaxed), 1) + } + + func testJobSerialization() async throws { + struct TestJobParameters: Codable { + let id: Int + let message: String + } + let expectation = XCTestExpectation(description: "TestJob.execute was called") + let jobIdentifer = JobIdentifier(#function) + let jobQueue = JobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(id: jobIdentifer) { parameters, _ in + XCTAssertEqual(parameters.id, 23) + XCTAssertEqual(parameters.message, "Hello!") + expectation.fulfill() + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) + + await self.wait(for: [expectation], timeout: 5) + } + } + + func testJobParameters() async throws { + struct TestJobParameters: JobParameters { + static let jobID: String = "TestJobParameters" + let id: Int + let message: String + } + let expectation = XCTestExpectation(description: "TestJob.execute was called") + let jobQueue = JobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(parameters: TestJobParameters.self) { parameters, _ in + XCTAssertEqual(parameters.id, 23) + XCTAssertEqual(parameters.message, "Hello!") + expectation.fulfill() + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(TestJobParameters(id: 23, message: "Hello!")) + + await self.wait(for: [expectation], timeout: 5) + } + } + + /// Verify test job is cancelled when service group is cancelled + func testShutdownJob() async throws { + let jobIdentifer = JobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) + + let cancelledJobCount = ManagedAtomic(0) + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .trace + let jobQueue = JobQueue( + MemoryQueue { _, error in + if error is CancellationError { + cancelledJobCount.wrappingIncrement(by: 1, ordering: .relaxed) + } + }, + numWorkers: 4, + logger: logger + ) + jobQueue.registerJob(id: jobIdentifer) { _, _ in + expectation.fulfill() + try await Task.sleep(for: .milliseconds(1000)) + } + try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [jobQueue], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: Logger(label: "JobQueueService") + ) + ) + group.addTask { + try await serviceGroup.run() + } + try await jobQueue.push(id: jobIdentifer, parameters: 0) + group.cancelAll() + await self.wait(for: [expectation], timeout: 5) + } + + XCTAssertEqual(cancelledJobCount.load(ordering: .relaxed), 1) + } + + /// test job fails to decode but queue continues to process + func testFailToDecode() async throws { + let string: NIOLockedValueBox = .init("") + let jobIdentifer1 = JobIdentifier(#function) + let jobIdentifer2 = JobIdentifier(#function) + let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) + + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .debug + let jobQueue = JobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(id: jobIdentifer2) { parameters, _ in + string.withLockedValue { $0 = parameters } + expectation.fulfill() + } + try await self.testJobQueue(jobQueue) { + try await jobQueue.push(id: jobIdentifer1, parameters: 2) + try await jobQueue.push(id: jobIdentifer2, parameters: "test") + await self.wait(for: [expectation], timeout: 5) + } + string.withLockedValue { + XCTAssertEqual($0, "test") + } + } + + func testMultipleJobQueueHandlers() async throws { + let jobIdentifer = JobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) + let job = JobDefinition(id: jobIdentifer) { parameters, context in + context.logger.info("Parameters=\(parameters)") + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + expectation.fulfill() + } + let logger = { + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .debug + return logger + }() + let jobQueue = JobQueue(.memory, numWorkers: 2, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue.registerJob(job) + let jobQueue2 = JobQueue(.memory, numWorkers: 1, logger: Logger(label: "HummingbirdJobsTests")) + jobQueue2.registerJob(job) + + try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [jobQueue, jobQueue2], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + ) + group.addTask { + try await serviceGroup.run() + } + do { + for i in 0..<200 { + try await jobQueue.push(id: jobIdentifer, parameters: i) + } + await self.wait(for: [expectation], timeout: 5) + await serviceGroup.triggerGracefulShutdown() + } catch { + XCTFail("\(String(reflecting: error))") + await serviceGroup.triggerGracefulShutdown() + throw error + } + } + } +} diff --git a/scripts/validate.sh b/scripts/validate.sh new file mode 100755 index 0000000..f621e66 --- /dev/null +++ b/scripts/validate.sh @@ -0,0 +1,118 @@ +#!/bin/bash +##===----------------------------------------------------------------------===## +## +## This source file is part of the SwiftNIO open source project +## +## Copyright (c) 2017-2019 Apple Inc. and the SwiftNIO project authors +## Licensed under Apache License v2.0 +## +## See LICENSE.txt for license information +## See CONTRIBUTORS.txt for the list of SwiftNIO project authors +## +## SPDX-License-Identifier: Apache-2.0 +## +##===----------------------------------------------------------------------===## + +SWIFT_FORMAT_VERSION=0.51.15 + +set -eu +here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +which swiftformat > /dev/null 2>&1 || (echo "swiftformat not installed. You can install it using 'brew install swiftformat'" ; exit -1) + +function replace_acceptable_years() { + # this needs to replace all acceptable forms with 'YEARS' + sed -e 's/20[12][0-9]-20[12][0-9]/YEARS/' -e 's/20[12][0-9]/YEARS/' -e '/^#!/ d' +} + +printf "=> Checking format... " +FIRST_OUT="$(git status --porcelain)" +if [[ -n "${CI-""}" ]]; then + printf "(using v$(mint run NickLockwood/SwiftFormat@"$SWIFT_FORMAT_VERSION" --version)) " + mint run NickLockwood/SwiftFormat@"$SWIFT_FORMAT_VERSION" . > /dev/null 2>&1 +else + printf "(using v$(swiftformat --version)) " + swiftformat . > /dev/null 2>&1 +fi +SECOND_OUT="$(git status --porcelain)" +if [[ "$FIRST_OUT" != "$SECOND_OUT" ]]; then + printf "\033[0;31mformatting issues!\033[0m\n" + git --no-pager diff + exit 1 +else + printf "\033[0;32mokay.\033[0m\n" +fi +exit +printf "=> Checking license headers... " +tmp=$(mktemp /tmp/.soto-core-sanity_XXXXXX) + +for language in swift-or-c; do + declare -a matching_files + declare -a exceptions + expections=( ) + matching_files=( -name '*' ) + case "$language" in + swift-or-c) + exceptions=( -path '*Sources/INIParser/*' -o -path '*Sources/CSotoExpat/*' -o -path '*Benchmark/.build/*' -o -name Package.swift) + matching_files=( -name '*.swift' -o -name '*.c' -o -name '*.h' ) + cat > "$tmp" <<"EOF" +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird open source project +// +// Copyright (c) YEARS the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +EOF + ;; + bash) + matching_files=( -name '*.sh' ) + cat > "$tmp" <<"EOF" +##===----------------------------------------------------------------------===## +## +## This source file is part of the Hummingbird open source project +## +## Copyright (c) YEARS the Hummingbird authors +## Licensed under Apache License v2.0 +## +## See LICENSE.txt for license information +## See CONTRIBUTORS.txt for the list of Hummingbird authors +## +## SPDX-License-Identifier: Apache-2.0 +## +##===----------------------------------------------------------------------===## +EOF + ;; + *) + echo >&2 "ERROR: unknown language '$language'" + ;; + esac + + lines_to_compare=$(cat "$tmp" | wc -l | tr -d " ") + # need to read one more line as we remove the '#!' line + lines_to_read=$(expr "$lines_to_compare" + 1) + expected_sha=$(cat "$tmp" | shasum) + + ( + cd "$here/.." + find . \ + \( \! -path './.build/*' -a \ + \( "${matching_files[@]}" \) -a \ + \( \! \( "${exceptions[@]}" \) \) \) | while read line; do + if [[ "$(cat "$line" | head -n $lines_to_read | replace_acceptable_years | head -n $lines_to_compare | shasum)" != "$expected_sha" ]]; then + printf "\033[0;31mmissing headers in file '$line'!\033[0m\n" + diff -u <(cat "$line" | head -n $lines_to_read | replace_acceptable_years | head -n $lines_to_compare) "$tmp" + exit 1 + fi + done + printf "\033[0;32mokay.\033[0m\n" + ) +done + +rm "$tmp"