diff --git a/Package.swift b/Package.swift index 88cdac65..bf7e4f99 100644 --- a/Package.swift +++ b/Package.swift @@ -8,22 +8,25 @@ let package = Package( ], dependencies: [ // 🌎 Utility package containing tools for byte manipulation, Codable, OS APIs, and debugging. - .package(url: "https://github.com/vapor/core.git", from: "3.0.0-rc.2"), + .package(url: "https://github.com/vapor/core.git", from: "3.0.0"), // 🔑 Hashing (BCrypt, SHA, HMAC, etc), encryption, and randomness. - .package(url: "https://github.com/vapor/crypto.git", from: "3.0.0-rc.2"), + .package(url: "https://github.com/vapor/crypto.git", from: "3.0.0"), // 🗄 Core services for creating database integrations. - .package(url: "https://github.com/vapor/database-kit.git", from: "1.0.0-rc.2"), + .package(url: "https://github.com/vapor/database-kit.git", from: "1.0.0"), // 📦 Dependency injection / inversion of control framework. - .package(url: "https://github.com/vapor/service.git", from: "1.0.0-rc.2"), + .package(url: "https://github.com/vapor/service.git", from: "1.0.0"), + + // *️⃣ Build SQL queries in Swift. + .package(url: "https://github.com/vapor/sql.git", from: "1.0.0"), // Event-driven network application framework for high performance protocol servers & clients, non-blocking. .package(url: "https://github.com/apple/swift-nio.git", from: "1.0.0"), ], targets: [ - .target(name: "PostgreSQL", dependencies: ["Async", "Bits", "Core", "Crypto", "DatabaseKit", "NIO", "Service"]), + .target(name: "PostgreSQL", dependencies: ["Async", "Bits", "Core", "Crypto", "DatabaseKit", "NIO", "Service", "SQL"]), .testTarget(name: "PostgreSQLTests", dependencies: ["Core", "PostgreSQL"]), ] ) diff --git a/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift b/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift new file mode 100644 index 00000000..190aa247 --- /dev/null +++ b/Sources/PostgreSQL/Connection/PostgreSQLConnection+NotifyAndListen.swift @@ -0,0 +1,38 @@ +import Async + + +extension PostgreSQLConnection { + /// Note: after calling `listen'` on a connection, it can no longer handle other database operations. Do not try to send other SQL commands through this connection afterwards. + /// IAlso, notifications will only be sent for as long as this connection remains open; you are responsible for opening a new connection to listen on when this one closes. + internal func listen(_ channelName: String, handler: @escaping (String) throws -> ()) throws -> Future { + closeHandlers.append({ conn in + let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";") + return conn.send([.query(query)], onResponse: { _ in }) + }) + + notificationHandlers[channelName] = { message in + try handler(message) + } + let query = PostgreSQLQuery(query: "LISTEN \"\(channelName)\";") + return queue.enqueue([.query(query)], onInput: { message in + switch message { + case let .notificationResponse(notification): + try self.notificationHandlers[notification.channel]?(notification.message) + default: + break + } + return false + }) + } + + internal func notify(_ channelName: String, message: String) throws -> Future { + let query = PostgreSQLQuery(query: "NOTIFY \"\(channelName)\", '\(message)';") + return send([.query(query)]).map(to: Void.self, { _ in }) + } + + internal func unlisten(_ channelName: String, unlistenHandler: (() -> Void)? = nil) throws -> Future { + notificationHandlers.removeValue(forKey: channelName) + let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";") + return send([.query(query)], onResponse: { _ in unlistenHandler?() }) + } +} diff --git a/Sources/PostgreSQL/Connection/PostgreSQLConnection+Query.swift b/Sources/PostgreSQL/Connection/PostgreSQLConnection+Query.swift index 75ed5e6e..6f51d5c9 100644 --- a/Sources/PostgreSQL/Connection/PostgreSQLConnection+Query.swift +++ b/Sources/PostgreSQL/Connection/PostgreSQLConnection+Query.swift @@ -5,9 +5,9 @@ extension PostgreSQLConnection { public func query( _ string: String, _ parameters: [PostgreSQLDataConvertible] = [] - ) throws -> Future<[[PostgreSQLColumn: PostgreSQLData]]> { + ) -> Future<[[PostgreSQLColumn: PostgreSQLData]]> { var rows: [[PostgreSQLColumn: PostgreSQLData]] = [] - return try query(string, parameters) { row in + return query(string, parameters) { row in rows.append(row) }.map(to: [[PostgreSQLColumn: PostgreSQLData]].self) { return rows @@ -21,8 +21,26 @@ extension PostgreSQLConnection { _ parameters: [PostgreSQLDataConvertible] = [], resultFormat: PostgreSQLResultFormat = .binary(), onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) throws -> () + ) -> Future { + return operation { + do { + return try self._query(string, parameters, resultFormat: resultFormat, onRow: onRow) + } catch { + return self.eventLoop.newFailedFuture(error: error) + } + } + } + + /// Non-operation bounded query. + private func _query( + _ string: String, + _ parameters: [PostgreSQLDataConvertible] = [], + resultFormat: PostgreSQLResultFormat = .binary(), + onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) throws -> () ) throws -> Future { let parameters = try parameters.map { try $0.convertToPostgreSQLData() } + logger?.record(query: string, values: parameters.map { $0.description }) + let parse = PostgreSQLParseRequest( statementName: "", query: string, diff --git a/Sources/PostgreSQL/Connection/PostgreSQLConnection+SimpleQuery.swift b/Sources/PostgreSQL/Connection/PostgreSQLConnection+SimpleQuery.swift index 38e9cd3e..650e6095 100644 --- a/Sources/PostgreSQL/Connection/PostgreSQLConnection+SimpleQuery.swift +++ b/Sources/PostgreSQL/Connection/PostgreSQLConnection+SimpleQuery.swift @@ -10,11 +10,15 @@ extension PostgreSQLConnection { return rows } } - /// Sends a simple PostgreSQL query command, returning the parsed results to /// the supplied closure. public func simpleQuery(_ string: String, onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) -> ()) -> Future { - logger?.log(query: string, parameters: []) + return operation { self._simpleQuery(string, onRow: onRow) } + } + + /// Non-operation bounded simple query. + private func _simpleQuery(_ string: String, onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) -> ()) -> Future { + logger?.record(query: string) var currentRow: PostgreSQLRowDescription? let query = PostgreSQLQuery(query: string) return send([.query(query)]) { message in diff --git a/Sources/PostgreSQL/Connection/PostgreSQLConnection.swift b/Sources/PostgreSQL/Connection/PostgreSQLConnection.swift index 30bd340e..6bb3a27e 100644 --- a/Sources/PostgreSQL/Connection/PostgreSQLConnection.swift +++ b/Sources/PostgreSQL/Connection/PostgreSQLConnection.swift @@ -1,21 +1,27 @@ -import Async import Crypto import NIO /// A PostgreSQL frontend client. -public final class PostgreSQLConnection { +public final class PostgreSQLConnection: DatabaseConnection, BasicWorker { + /// See `BasicWorker`. public var eventLoop: EventLoop { return channel.eventLoop } /// Handles enqueued redis commands and responses. - private let queue: QueueHandler + internal let queue: QueueHandler /// The channel private let channel: Channel /// If non-nil, will log queries. - public var logger: PostgreSQLLogger? + public var logger: DatabaseLogger? + + /// See `DatabaseConnection`. + public var isClosed: Bool + + /// See `Extendable`. + public var extend: Extend /// Returns a new unique portal name. internal var nextPortalName: String { @@ -32,21 +38,63 @@ public final class PostgreSQLConnection { /// A unique identifier for this connection, used to generate statment and portal names private var uniqueNameCounter: UInt8 + /// In-flight `send(...)` futures. + private var currentSend: Promise? + + /// The current query running, if one exists. + private var pipeline: Future + + /// Block type to be called on close of connection + internal typealias CloseHandler = ((PostgreSQLConnection) -> Future) + /// Called on close of the connection + internal var closeHandlers = [CloseHandler]() + /// Handler type for Notifications + internal typealias NotificationHandler = (String) throws -> Void + /// Handlers to be stored by channel name + internal var notificationHandlers: [String: NotificationHandler] = [:] + /// Creates a new Redis client on the provided data source and sink. init(queue: QueueHandler, channel: Channel) { self.queue = queue self.channel = channel self.uniqueNameCounter = 0 + self.isClosed = false + self.extend = [:] + self.pipeline = channel.eventLoop.newSucceededFuture(result: ()) + channel.closeFuture.always { + self.isClosed = true + if let current = self.currentSend { + current.fail(error: closeError) + } + } } - - deinit { - close() + /// Sends `PostgreSQLMessage` to the server. + func send(_ message: [PostgreSQLMessage]) -> Future<[PostgreSQLMessage]> { + var responses: [PostgreSQLMessage] = [] + return send(message) { response in + responses.append(response) + }.map(to: [PostgreSQLMessage].self) { + return responses + } } /// Sends `PostgreSQLMessage` to the server. func send(_ messages: [PostgreSQLMessage], onResponse: @escaping (PostgreSQLMessage) throws -> ()) -> Future { + // if currentSend is not nil, previous send has not completed + assert(currentSend == nil, "Attempting to call `send(...)` again before previous invocation has completed.") + + // ensure the connection is not closed + guard !isClosed else { + return eventLoop.newFailedFuture(error: closeError) + } + + // create a new promise and store it + let promise = eventLoop.newPromise(Void.self) + currentSend = promise + + // cascade this enqueue to the newly created promise var error: Error? - return queue.enqueue(messages) { message in + queue.enqueue(messages) { message in switch message { case .readyForQuery: if let e = error { throw e } @@ -56,17 +104,28 @@ public final class PostgreSQLConnection { default: try onResponse(message) } return false // request until ready for query - } + }.cascade(promise: promise) + + // when the promise completes, remove the reference to it + promise.futureResult.always { self.currentSend = nil } + + // return the promise's future result (same as `queue.enqueue`) + return promise.futureResult } - /// Sends `PostgreSQLMessage` to the server. - func send(_ message: [PostgreSQLMessage]) -> Future<[PostgreSQLMessage]> { - var responses: [PostgreSQLMessage] = [] - return send(message) { response in - responses.append(response) - }.map(to: [PostgreSQLMessage].self) { - return responses + /// Submits an async task to be pipelined. + internal func operation(_ work: @escaping () -> Future) -> Future { + /// perform this work when the current pipeline future is completed + let new = pipeline.then(work) + + /// append this work to the pipeline, discarding errors as the pipeline + //// does not care about them + pipeline = new.catchMap { err in + return () } + + /// return the newly enqueued work's future result + return new } /// Authenticates the `PostgreSQLClient` using a username with no password. @@ -134,8 +193,29 @@ public final class PostgreSQLConnection { } } + /// Closes this client. public func close() { - channel.close(promise: nil) + _ = executeCloseHandlersThenClose() } + + + private func executeCloseHandlersThenClose() -> Future { + if let beforeClose = closeHandlers.popLast() { + return beforeClose(self).then { _ in + self.executeCloseHandlersThenClose() + } + } else { + return channel.close(mode: .all) + } + } + + + /// Called when this class deinitializes. + deinit { + close() + } + } + +private let closeError = PostgreSQLError(identifier: "closed", reason: "Connection is closed.", source: .capture()) diff --git a/Sources/PostgreSQL/Database/PostgreSQLDatabase.swift b/Sources/PostgreSQL/Database/PostgreSQLDatabase.swift index f4f9b986..470c1378 100644 --- a/Sources/PostgreSQL/Database/PostgreSQLDatabase.swift +++ b/Sources/PostgreSQL/Database/PostgreSQLDatabase.swift @@ -1,26 +1,27 @@ import Async /// Creates connections to an identified PostgreSQL database. -public final class PostgreSQLDatabase: Database { +public final class PostgreSQLDatabase: Database, LogSupporting { + /// See `LogSupporting` + public static func enableLogging(_ logger: DatabaseLogger, on conn: PostgreSQLConnection) { + conn.logger = logger + } + /// This database's configuration. public let config: PostgreSQLDatabaseConfig - /// If non-nil, will log queries. - public var logger: PostgreSQLLogger? - /// Creates a new `PostgreSQLDatabase`. public init(config: PostgreSQLDatabaseConfig) { self.config = config } /// See `Database.makeConnection()` - public func makeConnection(on worker: Worker) -> Future { + public func newConnection(on worker: Worker) -> Future { let config = self.config return Future.flatMap(on: worker) { return try PostgreSQLConnection.connect(hostname: config.hostname, port: config.port, on: worker) { error in print("[PostgreSQL] \(error)") }.flatMap(to: PostgreSQLConnection.self) { client in - client.logger = self.logger return client.authenticate( username: config.username, database: config.database, @@ -31,9 +32,6 @@ public final class PostgreSQLDatabase: Database { } } -/// A connection created by a `PostgreSQLDatabase`. -extension PostgreSQLConnection: DatabaseConnection, BasicWorker { } - extension DatabaseIdentifier { /// Default identifier for `PostgreSQLDatabase`. public static var psql: DatabaseIdentifier { diff --git a/Sources/PostgreSQL/Message+Parse/PostgreSQLMessageDecoder.swift b/Sources/PostgreSQL/Message+Parse/PostgreSQLMessageDecoder.swift index a3ae17eb..b41e5e72 100644 --- a/Sources/PostgreSQL/Message+Parse/PostgreSQLMessageDecoder.swift +++ b/Sources/PostgreSQL/Message+Parse/PostgreSQLMessageDecoder.swift @@ -49,6 +49,7 @@ final class PostgreSQLMessageDecoder: ByteToMessageDecoder { let decoder = _PostgreSQLMessageDecoder(data: messageData) let message: PostgreSQLMessage switch messageType { + case .A: message = try .notificationResponse(decoder.decode()) case .E: message = try .error(decoder.decode()) case .N: message = try .notice(decoder.decode()) case .R: message = try .authenticationRequest(decoder.decode()) diff --git a/Sources/PostgreSQL/Message/PostgreSQLMessage.swift b/Sources/PostgreSQL/Message/PostgreSQLMessage.swift index 0fdcf522..b32210b4 100644 --- a/Sources/PostgreSQL/Message/PostgreSQLMessage.swift +++ b/Sources/PostgreSQL/Message/PostgreSQLMessage.swift @@ -7,6 +7,8 @@ enum PostgreSQLMessage { case error(PostgreSQLDiagnosticResponse) /// Identifies the message as a notice. case notice(PostgreSQLDiagnosticResponse) + /// Identifies the message as a notification response. + case notificationResponse(PostgreSQLNotificationResponse) /// One of the various authentication request message formats. case authenticationRequest(PostgreSQLAuthenticationRequest) /// Identifies the message as a password response. diff --git a/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift b/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift new file mode 100644 index 00000000..cc454c27 --- /dev/null +++ b/Sources/PostgreSQL/Message/PostgreSQLNotificationResponse.swift @@ -0,0 +1,13 @@ +import Foundation + +struct PostgreSQLNotificationResponse: Decodable { + /// The message coming from PSQL + let channel: String + let message: String + init(from decoder: Decoder) throws { + let container = try decoder.singleValueContainer() + _ = try container.decode(Int32.self) + channel = try container.decode(String.self) + message = try container.decode(String.self) + } +} diff --git a/Sources/PostgreSQL/PostgreSQLProvider.swift b/Sources/PostgreSQL/PostgreSQLProvider.swift index 8a55f2cb..44744424 100644 --- a/Sources/PostgreSQL/PostgreSQLProvider.swift +++ b/Sources/PostgreSQL/PostgreSQLProvider.swift @@ -14,7 +14,7 @@ public final class PostgreSQLProvider: Provider { try services.register(DatabaseKitProvider()) services.register(PostgreSQLDatabaseConfig.self) services.register(PostgreSQLDatabase.self) - var databases = DatabaseConfig() + var databases = DatabasesConfig() databases.add(database: PostgreSQLDatabase.self, as: .psql) services.register(databases) } diff --git a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift index 062fbecd..1de49bd6 100644 --- a/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift +++ b/Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift @@ -5,6 +5,7 @@ import PostgreSQL import Core class PostgreSQLConnectionTests: XCTestCase { + let defaultTimeout = 5.0 func testVersion() throws { let client = try PostgreSQLConnection.makeTest() let results = try client.simpleQuery("SELECT version();").wait() @@ -331,10 +332,10 @@ class PostgreSQLConnectionTests: XCTestCase { /// SELECT - let acronyms = try client.query(""" + let acronyms = client.query(""" SELECT "acronyms".* FROM "acronyms" WHERE ("acronyms"."id" = $1) LIMIT 1 OFFSET 0 """, [1]) - let categories = try client.query(""" + let categories = client.query(""" SELECT "categories".* FROM "categories" WHERE ("categories"."id" = $1) LIMIT 1 OFFSET 0 """, [1]) @@ -342,6 +343,86 @@ class PostgreSQLConnectionTests: XCTestCase { _ = try categories.wait() } +// func testNotifyAndListen() throws { +// let completionHandlerExpectation1 = expectation(description: "first completion handler called") +// let completionHandlerExpectation2 = expectation(description: "final completion handler called") +// let notifyConn = try PostgreSQLConnection.makeTest() +// let listenConn = try PostgreSQLConnection.makeTest() +// let channelName = "Fooze" +// let messageText = "Bar" +// let finalMessageText = "Baz" +// +// try listenConn.listen(channelName) { text in +// if text == messageText { +// completionHandlerExpectation1.fulfill() +// } else if text == finalMessageText { +// completionHandlerExpectation2.fulfill() +// } +// }.catch({ err in XCTFail("error \(err)") }) +// +// try notifyConn.notify(channelName, message: messageText).wait() +// try notifyConn.notify(channelName, message: finalMessageText).wait() +// +// waitForExpectations(timeout: defaultTimeout) +// notifyConn.close() +// listenConn.close() +// } +// +// func testNotifyAndListenOnMultipleChannels() throws { +// let completionHandlerExpectation1 = expectation(description: "first completion handler called") +// let completionHandlerExpectation2 = expectation(description: "final completion handler called") +// let notifyConn = try PostgreSQLConnection.makeTest() +// let listenConn = try PostgreSQLConnection.makeTest() +// let channelName = "Fooze" +// let channelName2 = "Foozalz" +// let messageText = "Bar" +// let finalMessageText = "Baz" +// +// try listenConn.listen(channelName) { text in +// if text == messageText { +// completionHandlerExpectation1.fulfill() +// } +// }.catch({ err in XCTFail("error \(err)") }) +// +// try listenConn.listen(channelName2) { text in +// if text == finalMessageText { +// completionHandlerExpectation2.fulfill() +// } +// }.catch({ err in XCTFail("error \(err)") }) +// +// try notifyConn.notify(channelName, message: messageText).wait() +// try notifyConn.notify(channelName2, message: finalMessageText).wait() +// +// waitForExpectations(timeout: defaultTimeout) +// notifyConn.close() +// listenConn.close() +// } +// +// func testUnlisten() throws { +// let unlistenHandlerExpectation = expectation(description: "unlisten completion handler called") +// +// let listenHandlerExpectation = expectation(description: "listen completion handler called") +// +// let notifyConn = try PostgreSQLConnection.makeTest() +// let listenConn = try PostgreSQLConnection.makeTest() +// let channelName = "Foozers" +// let messageText = "Bar" +// +// try listenConn.listen(channelName) { text in +// if text == messageText { +// listenHandlerExpectation.fulfill() +// } +// }.catch({ err in XCTFail("error \(err)") }) +// +// try notifyConn.notify(channelName, message: messageText).wait() +// try notifyConn.unlisten(channelName, unlistenHandler: { +// unlistenHandlerExpectation.fulfill() +// }).wait() +// waitForExpectations(timeout: defaultTimeout) +// notifyConn.close() +// listenConn.close() +// } + func testURLParsing() throws { let databaseURL = "postgres://username:password@hostname.com:5432/database" let config = try PostgreSQLDatabaseConfig(url: databaseURL) @@ -361,6 +442,9 @@ class PostgreSQLConnectionTests: XCTestCase { ("testStruct", testStruct), ("testNull", testNull), ("testGH24", testGH24), +// ("testNotifyAndListen", testNotifyAndListen), +// ("testNotifyAndListenOnMultipleChannels", testNotifyAndListenOnMultipleChannels), +// ("testUnlisten", testUnlisten), ("testURLParsing", testURLParsing), ] } diff --git a/circle.yml b/circle.yml index a66ea9fc..295e04fc 100644 --- a/circle.yml +++ b/circle.yml @@ -8,6 +8,8 @@ jobs: - checkout - run: swift build - run: swift test + + linux: docker: - image: codevapor/swift:4.1 @@ -24,10 +26,20 @@ jobs: - run: name: Run unit tests command: swift test + + + linux-release: + docker: + - image: codevapor/swift:4.1 + steps: + - checkout - run: name: Compile code with optimizations command: swift build -c release + + + linux-fluent: docker: - image: codevapor/swift:4.1 @@ -56,6 +68,7 @@ workflows: jobs: - linux - linux-fluent + - linux-release # - macos nightly: