From 5201f082179bf92380e7ae3e8f1711bceec7370d Mon Sep 17 00:00:00 2001 From: Filip Schouwenaars Date: Tue, 5 Jul 2022 23:29:54 +0200 Subject: [PATCH 1/2] chore: pull in aws.sqs into package --- DESCRIPTION | 11 ++- Dockerfile | 1 - NAMESPACE | 12 ++- R/aws.R | 41 ---------- R/aws_sqs_attributes.R | 11 +++ R/aws_sqs_http.R | 103 +++++++++++++++++++++++ R/aws_sqs_messages.R | 181 +++++++++++++++++++++++++++++++++++++++++ R/main.R | 12 +-- man/receive_msg.Rd | 85 +++++++++++++++++++ man/send_msg.Rd | 59 ++++++++++++++ man/sqsHTTP.Rd | 51 ++++++++++++ 11 files changed, 512 insertions(+), 55 deletions(-) create mode 100644 R/aws_sqs_attributes.R create mode 100644 R/aws_sqs_http.R create mode 100644 R/aws_sqs_messages.R create mode 100644 man/receive_msg.Rd create mode 100644 man/send_msg.Rd create mode 100644 man/sqsHTTP.Rd diff --git a/DESCRIPTION b/DESCRIPTION index 36da082..012c2fe 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -11,13 +11,16 @@ License: MIT + file LICENSE LazyData: TRUE Depends: R (>= 3.0.0), - pkgdown, - aws.sqs + pkgdown Imports: - jsonlite + jsonlite, + aws.signature, + jsonlite, + purrr, + xml2 Remotes: datacamp/pkgdown Suggests: testthat -RoxygenNote: 6.0.1 +RoxygenNote: 7.1.1 VignetteBuilder: knitr diff --git a/Dockerfile b/Dockerfile index 4a0a8d3..355667f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,6 @@ RUN apt-get install libharfbuzz-dev libfribidi-dev RUN R -e 'install.packages("remotes")' RUN R -e 'remotes::install_github("datacamp/pkgdown", ref = "master")' -RUN R -e 'install.packages("aws.sqs", repos = c(getOption("repos"), "http://cloudyr.github.io/drat"))' COPY . r-package-parser diff --git a/NAMESPACE b/NAMESPACE index a78ea69..4a2ddea 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,12 +1,16 @@ # Generated by roxygen2: do not edit by hand +export(consume_msg) +export(delete_msg) export(main) export(parse_description) export(parse_topics) export(process_package) -importFrom(aws.sqs,create_queue) -importFrom(aws.sqs,delete_msg) -importFrom(aws.sqs,receive_msg) +export(receive_msg) +export(send_msg) +export(sqsHTTP) +import(httr) +importFrom(aws.signature,signature_v4_auth) importFrom(jsonlite,fromJSON) importFrom(jsonlite,prettify) importFrom(jsonlite,toJSON) @@ -14,3 +18,5 @@ importFrom(jsonlite,write_json) importFrom(magrittr,"%>%") importFrom(purrr,map) importFrom(purrr,transpose) +importFrom(xml2,as_list) +importFrom(xml2,read_xml) diff --git a/R/aws.R b/R/aws.R index 8941ddb..441ac03 100644 --- a/R/aws.R +++ b/R/aws.R @@ -37,47 +37,6 @@ copy_local <- function(local, path, dirname){ } } -send_msg <- function(queue, msg, query = list(), attributes = NULL, delay = NULL, ...) { - queue <- aws.sqs:::.urlFromName(queue) - if(length(msg) > 1) { - # batch mode - batchs <- split(msg, ceiling(seq_along(msg)/10)) - - for (batch in batchs) { - l <- length(batch) - n <- 1:l - id <- paste0("msg", n) - a <- as.list(c(id, batch)) - names(a) <- c(paste0("SendMessageBatchRequestEntry.",n,".Id"), - paste0("SendMessageBatchRequestEntry.",n,".MessageBody")) - query_args <- list(Action = "SendMessageBatch") - query_mult <- rep(query, each = l) - front <- c(paste0("SendMessageBatchRequestEntry.",n, ".")) - back <- rep(names(query), each = l) - names(query_mult) <- paste0(front, back) - - body <- c(a, query_mult, query_args) - - out <- aws.sqs:::sqsHTTP(url = queue, query = body, ...) - if (inherits(out, "aws-error") || inherits(out, "unknown")) { - return(out) - } - structure(out$SendMessageBatchResponse$SendMessageBatchResult, - RequestId = out$SendMessageBatchResponse$ResponseMetadata$RequestId) - } - - } else { - # single mode - query_args <- append(query, list(Action = "SendMessage")) - query_args$MessageBody = msg - out <- aws.sqs:::sqsHTTP(url = queue, query = query_args, ...) - if (inherits(out, "aws-error") || inherits(out, "unknown")) { - return(out) - } - structure(list(out$SendMessageResponse$SendMessageResult), - RequestId = out$SendMessageResponse$ResponseMetadata$RequestId) - } -} post_job <- function(queue, json, value) { info(sprintf("Posting %s job...", value)) diff --git a/R/aws_sqs_attributes.R b/R/aws_sqs_attributes.R new file mode 100644 index 0000000..6ca6e27 --- /dev/null +++ b/R/aws_sqs_attributes.R @@ -0,0 +1,11 @@ +.urlFromName <- function(queue) { + p <- httr::parse_url(queue) + if (is.null(p$scheme)) { + out <- get_queue_url(queue) + if(!length(out)) + stop("Queue URL not found") + } else { + out <- queue + } + return(out) +} diff --git a/R/aws_sqs_http.R b/R/aws_sqs_http.R new file mode 100644 index 0000000..49c56a7 --- /dev/null +++ b/R/aws_sqs_http.R @@ -0,0 +1,103 @@ +#' @title Execute SQS API Request +#' @description This is the workhorse function to execute calls to the SQS API. +#' @details This function constructs and signs an SQS API request and returns the results thereof, or relevant debugging information in the case of error. +#' @param url A character string containing an SQS API endpoint URL. +#' @param query An optional named list containing query string parameters and their character values. +#' @param headers A list of headers to pass to the HTTP request. +#' @param verbose A logical indicating whether to be verbose. Default is given by \code{options("verbose")}. +#' @param region A character string specifying an AWS region. See \code{\link[aws.signature]{locate_credentials}}. +#' @param key A character string specifying an AWS Access Key. See \code{\link[aws.signature]{locate_credentials}}. +#' @param secret A character string specifying an AWS Secret Key. See \code{\link[aws.signature]{locate_credentials}}. +#' @param session_token Optionally, a character string specifying an AWS temporary Session Token to use in signing a request. See \code{\link[aws.signature]{locate_credentials}}. +#' @param ... Additional arguments passed to \code{\link[httr]{GET}}. +#' @return If successful, a named list. Otherwise, a data structure of class +#' \dQuote{aws_error} containing any error message(s) from AWS and information +#' about the request attempt. +#' @author Thomas J. Leeper +#' @importFrom aws.signature signature_v4_auth +#' @importFrom jsonlite fromJSON +#' @importFrom xml2 read_xml as_list +#' @import httr +#' @export +sqsHTTP <- + function( + url = NULL, + headers = list(), + query = list(), + verbose = getOption("verbose", FALSE), + region = Sys.getenv("AWS_DEFAULT_REGION", "us-east-1"), + key = NULL, + secret = NULL, + session_token = NULL, + ... + ) { + # locate and validate credentials + credentials <- aws.signature::locate_credentials(key = key, secret = secret, session_token = session_token, region = region, verbose = verbose) + key <- credentials[["key"]] + secret <- credentials[["secret"]] + session_token <- credentials[["session_token"]] + region <- credentials[["region"]] + + # generate request signature + if (is.null(url)) { + url <- paste0("https://sqs.",region,".amazonaws.com") + } + p <- httr::parse_url(url) + action <- if(p$path == "") "/" else paste0("/", p$path) + d_timestamp <- format(Sys.time(), "%Y%m%dT%H%M%SZ", tz = "UTC") + Sig <- aws.signature::signature_v4_auth( + datetime = d_timestamp, + region = region, + service = "sqs", + verb = "GET", + action = action, + query_args = query, + canonical_headers = list(host = paste0("sqs.",region,".amazonaws.com"), + `x-amz-date` = d_timestamp), + request_body = "", + key = key, + secret = secret, + session_token = session_token, + verbose = verbose) + # setup request headers + headers[["x-amz-date"]] <- d_timestamp + headers[["x-amz-content-sha256"]] <- Sig$BodyHash + headers[["Authorization"]] <- Sig[["SignatureHeader"]] + if (!is.null(session_token) && session_token != "") { + headers[["x-amz-security-token"]] <- session_token + } + H <- do.call(httr::add_headers, headers) + + # execute request + if (length(query)) { + r <- httr::GET(url, H, query = query, ...) + } else { + r <- httr::GET(url, H, ...) + } + + cont <- httr::content(r, "text", encoding = "UTF-8") + if (httr::http_error(r)) { + x <- try(xml2::as_list(xml2::read_xml(cont)), silent = TRUE) + if (inherits(x, "try-error")) { + x <- try(jsonlite::fromJSON(cont)$Error, silent = TRUE) + } + warning(paste0(httr::http_status(r)$message, ": ", x$Code, " (", x$Message, ")")) + h <- httr::headers(r) + out <- structure(x, headers = h, class = "aws_error") + attr(out, "request_canonical") <- Sig$CanonicalRequest + attr(out, "request_string_to_sign") <- Sig$StringToSign + attr(out, "request_signature") <- Sig$SignatureHeader + } else { + out <- try(jsonlite::fromJSON(cont), silent = TRUE) + if (inherits(out, "try-error")) { + out2 <- try(xml2::as_list(xml2::read_xml(cont)), silent = TRUE) + if (inherits(out2, "try-error")) { + out <- structure(cont, class = "unknown") + } else { + out <- out2 + } + + } + } + return(out) + } \ No newline at end of file diff --git a/R/aws_sqs_messages.R b/R/aws_sqs_messages.R new file mode 100644 index 0000000..a969e8e --- /dev/null +++ b/R/aws_sqs_messages.R @@ -0,0 +1,181 @@ +#' @rdname receive_msg +#' @title Receive messages +#' @description Receive one or more messages from an SQS queue. +#' @details \code{receive_msg} simply receives message(s). \code{consume_msg} does the same and then deletes the message(s) from the queue. \code{delete_message} deletes one or more messages from an SQS queue. If a message is not deleted, it remains visible in the queue and will be returned by subsequent calls to \code{\link{receive_msg}}. +#' @param queue A character string containing a queue URL, or the name of the queue. +#' @param handle A message handle, as returned by \code{\link{receive_msg}}. +#' @param attributes Currently ignored. +#' @param n The number of messages to retrieve (maximum 10). +#' @param timeout A number of seconds to make the message invisible to subsequent \code{receive_msg} requests. This modifies the queue's default visibility timeout. See \code{\link{visibility}} to modify this value after receiving a message. +#' @param wait A number of seconds to wait for messages before responding to the request. +#' @param query A list specifying additional query arguments to be passed to the \code{query} argument of \code{\link{sqsHTTP}}. +#' @param ... Additional arguments passed to \code{\link{sqsHTTP}}. +#' @return A data.frame of messages. +#' @author Thomas J. Leeper +#' @examples +#' \dontrun{ +#' # list current queues +#' list_queues() +#' +#' # create a queue +#' queue <- create_queue("ExampleQueue") +#' get_queue_url("ExampleQueue") +#' +#' # send message to queue +#' send_msg("ExampleQueue", "This is a test message") +#' # receive a message +#' (m <- receive_msg("ExampleQueue", timeout = 0)) +#' +#' # delete a message from queue +#' delete_msg("ExampleQueue", m$ReceiptHandle[1]) +#' +#' # delete queue +#' delete_queue("ExampleQueue") +#' +#' } +#' @seealso \code{link{send_msg}} +#' @references +#' \href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html}{ReceiveMessage} +#' \href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html}{DeleteMessage} +#' \href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html}{DeleteMessageBatch} +#' @export +receive_msg <- function(queue, attributes = NULL, n = 1, timeout = NULL, wait = NULL, query = NULL, ...) { + queue <- .urlFromName(queue) + query_args <- c(query, list(Action = "ReceiveMessage")) + if (n > 10) { + query_args$MaxNumberOfMessages <- 10L + warning("Maxmimum 'n' exceeded so 10 used by default") + } else if (n < 1) { + stop("Only positive 'n' values can be retrieved") + } else { + query_args$MaxNumberOfMessages <- round(n, 0) + } + if (!is.null(timeout)) { + query_args$VisibilityTimeout <- round(timeout, 0) + } + if (!is.null(wait)) { + w <- as.integer(round(wait, 0)) + query_args$WaitTimeSeconds <- w + out <- sqsHTTP(url = queue, query = query_args, ...) + } else { + out <- sqsHTTP(url = queue, query = query_args, ...) + } + if (inherits(out, "aws-error") || inherits(out, "unknown")) { + return(out) + } + out2 <- out$ReceiveMessageResponse$ReceiveMessageResult + if (!length(out2)) { + out2 <- data.frame(Attributes = character(0), + Body = character(0), + MD5OfBody = character(0), + MD5OfMessageAttributes = character(0), + MessageAttributes = character(0), + MessageId = character(0), + ReceiptHandle = character(0), + stringsAsFactors = FALSE) + } + structure(out2, + RequestId = out$ReceiveMessageResponse$ResponseMetadata$RequestId) +} + +#' @rdname receive_msg +#' @param receive_args A named list of arguments, other than \code{queue}, to be passed to \code{receive_msg}. +#' @param delete_args A named list of arguments, other than \code{queue} and \code{handle}, to be passed to \code{delete_msg}. +#' @export +consume_msg <- function(queue, receive_args = list(), delete_args = list()) { + m <- do.call("receive_msg", c(receive_args, list(queue = queue))) + d <- do.call("delete_msg", c(delete_args, list(queue = queue, handle = m$ReceiptHandle))) + m +} + +#' @rdname receive_msg +#' @export +delete_msg <- function(queue, handle, query = NULL, ...) { + queue <- .urlFromName(queue) + if (length(handle) > 1) { + # batch mode + query_args <- c(query, list(action = "DeleteMessageBatch")) + n <- 1:length(handle) + id <- paste0("msg", n) + a <- as.list(c(id, handle)) + names(a) <- c(paste0("DeleteMessageBatchRequestEntry.",n,".Id"), + paste0("DeleteMessageBatchRequestEntry.",n,".ReceiptHandle")) + query_args <- c(query_args, a) + } else { + # single mode + query_args <- c(query, list(ReceiptHandle = handle, Action = "DeleteMessage")) + } + out <- sqsHTTP(url = queue, query = query_args, ...) + if (inherits(out, "aws-error") || inherits(out, "unknown")) { + return(out) + } + structure(TRUE, RequestId = out$DeleteMessageResponse$ResponseMetadata$RequestId) +} + + + +#' @title send_msg +#' @description Send a message to an SQS queue +#' @param queue A character string containing a queue URL, or the name of the queue. +#' @param msg A character vector containing one or more message bodies. +#' @param attributes Currently ignored. (If \code{msg} is of length one, a specification of message attributes. Ignored otherwise.) +#' @param delay A numeric value indicating the number of seconds between 0 and 900 to delay a specific message. If \code{NULL}, the default value for the queue applies. +#' @param query A list specifying additional query arguments to be passed to the \code{query} argument of \code{\link{sqsHTTP}}. +#' @param ... Additional arguments passed to \code{\link{sqsHTTP}}. +#' @return A list of message information, including the MessageId and an MD5 checksum of the message body. +#' @author Thomas J. Leeper +#' @examples +#' \dontrun{ +#' # list current queues +#' list_queues() +#' +#' # create a queue +#' queue <- create_queue("ExampleQueue") +#' get_queue_url("ExampleQueue") +#' +#' # send message to queue +#' send_msg("ExampleQueue", "This is a test message") +#' # receive a message +#' (m <- receive_msg("ExampleQueue", timeout = 0)) +#' +#' # delete a message from queue +#' delete_msg("ExampleQueue", m$ReceiptHandle[1]) +#' +#' # delete queue +#' delete_queue("ExampleQueue") +#' +#' } +#' @seealso \code{link{receive_msg}} \code{link{delete_msg}} +#' @references +#' \href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html}{SendMessage} +#' \href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html}{SendMessageBatch} +#' @export +send_msg <- function(queue, msg, attributes = NULL, delay = NULL, query = NULL, ...) { + queue <- .urlFromName(queue) + if(length(msg) > 1) { + # batch mode + query_args <- list(Action = "SendMessageBatch") + n <- seq_len(length(msg)) + id <- paste0("msg", n) + a <- as.list(c(id, msg)) + names(a) <- c(paste0("SendMessageBatchRequestEntry.",n,".Id"), + paste0("SendMessageBatchRequestEntry.",n,".MessageBody")) + query_args <- c(query_args, a) + out <- sqsHTTP(url = queue, query = query_args, ...) + if (inherits(out, "aws-error") || inherits(out, "unknown")) { + return(out) + } + structure(out$SendMessageBatchResponse$SendMessageBatchResult, + RequestId = out$SendMessageBatchResponse$ResponseMetadata$RequestId) + } else { + # single mode + query_args <- list(Action = "SendMessage") + query_args$MessageBody = msg + out <- sqsHTTP(url = queue, query = c(query, query_args), ...) + if (inherits(out, "aws-error") || inherits(out, "unknown")) { + return(out) + } + structure(list(out$SendMessageResponse$SendMessageResult), + RequestId = out$SendMessageResponse$ResponseMetadata$RequestId) + } +} \ No newline at end of file diff --git a/R/main.R b/R/main.R index fa01626..e2c235c 100644 --- a/R/main.R +++ b/R/main.R @@ -1,7 +1,6 @@ #' Main entry point of the package #' #' @export -#' @importFrom aws.sqs create_queue receive_msg delete_msg #' @importFrom jsonlite fromJSON toJSON prettify main <- function() { parser_version <- 2 @@ -15,14 +14,15 @@ main <- function() { to_queue <- Sys.getenv("DEST_QUEUE") # rdoc-app-worker error_queue <- Sys.getenv("DEADLETTER_QUEUE") # rdoc-r-worker-deadletter - # initialize the queues - create_queue(from_queue) - create_queue(to_queue) - create_queue(error_queue) + # # initialize the queues + # create_queue(from_queue) + # create_queue(to_queue) + # create_queue(error_queue) while(1) { - info("Polling for messages...") + info("Polling for messages on", from_queue, "...") messages <- receive_msg(from_queue, wait = 20) + print(messages) if(nrow(messages) > 0) { for (i in 1:nrow(messages)) { diff --git a/man/receive_msg.Rd b/man/receive_msg.Rd new file mode 100644 index 0000000..7eb5726 --- /dev/null +++ b/man/receive_msg.Rd @@ -0,0 +1,85 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/aws_sqs_messages.R +\name{receive_msg} +\alias{receive_msg} +\alias{consume_msg} +\alias{delete_msg} +\title{Receive messages} +\usage{ +receive_msg( + queue, + attributes = NULL, + n = 1, + timeout = NULL, + wait = NULL, + query = NULL, + ... +) + +consume_msg(queue, receive_args = list(), delete_args = list()) + +delete_msg(queue, handle, query = NULL, ...) +} +\arguments{ +\item{queue}{A character string containing a queue URL, or the name of the queue.} + +\item{attributes}{Currently ignored.} + +\item{n}{The number of messages to retrieve (maximum 10).} + +\item{timeout}{A number of seconds to make the message invisible to subsequent \code{receive_msg} requests. This modifies the queue's default visibility timeout. See \code{\link{visibility}} to modify this value after receiving a message.} + +\item{wait}{A number of seconds to wait for messages before responding to the request.} + +\item{query}{A list specifying additional query arguments to be passed to the \code{query} argument of \code{\link{sqsHTTP}}.} + +\item{...}{Additional arguments passed to \code{\link{sqsHTTP}}.} + +\item{receive_args}{A named list of arguments, other than \code{queue}, to be passed to \code{receive_msg}.} + +\item{delete_args}{A named list of arguments, other than \code{queue} and \code{handle}, to be passed to \code{delete_msg}.} + +\item{handle}{A message handle, as returned by \code{\link{receive_msg}}.} +} +\value{ +A data.frame of messages. +} +\description{ +Receive one or more messages from an SQS queue. +} +\details{ +\code{receive_msg} simply receives message(s). \code{consume_msg} does the same and then deletes the message(s) from the queue. \code{delete_message} deletes one or more messages from an SQS queue. If a message is not deleted, it remains visible in the queue and will be returned by subsequent calls to \code{\link{receive_msg}}. +} +\examples{ +\dontrun{ + # list current queues + list_queues() + + # create a queue + queue <- create_queue("ExampleQueue") + get_queue_url("ExampleQueue") + + # send message to queue + send_msg("ExampleQueue", "This is a test message") + # receive a message + (m <- receive_msg("ExampleQueue", timeout = 0)) + + # delete a message from queue + delete_msg("ExampleQueue", m$ReceiptHandle[1]) + + # delete queue + delete_queue("ExampleQueue") + +} +} +\references{ +\href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html}{ReceiveMessage} +\href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html}{DeleteMessage} +\href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html}{DeleteMessageBatch} +} +\seealso{ +\code{link{send_msg}} +} +\author{ +Thomas J. Leeper +} diff --git a/man/send_msg.Rd b/man/send_msg.Rd new file mode 100644 index 0000000..d23ab86 --- /dev/null +++ b/man/send_msg.Rd @@ -0,0 +1,59 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/aws_sqs_messages.R +\name{send_msg} +\alias{send_msg} +\title{send_msg} +\usage{ +send_msg(queue, msg, attributes = NULL, delay = NULL, query = NULL, ...) +} +\arguments{ +\item{queue}{A character string containing a queue URL, or the name of the queue.} + +\item{msg}{A character vector containing one or more message bodies.} + +\item{attributes}{Currently ignored. (If \code{msg} is of length one, a specification of message attributes. Ignored otherwise.)} + +\item{delay}{A numeric value indicating the number of seconds between 0 and 900 to delay a specific message. If \code{NULL}, the default value for the queue applies.} + +\item{query}{A list specifying additional query arguments to be passed to the \code{query} argument of \code{\link{sqsHTTP}}.} + +\item{...}{Additional arguments passed to \code{\link{sqsHTTP}}.} +} +\value{ +A list of message information, including the MessageId and an MD5 checksum of the message body. +} +\description{ +Send a message to an SQS queue +} +\examples{ +\dontrun{ + # list current queues + list_queues() + + # create a queue + queue <- create_queue("ExampleQueue") + get_queue_url("ExampleQueue") + + # send message to queue + send_msg("ExampleQueue", "This is a test message") + # receive a message + (m <- receive_msg("ExampleQueue", timeout = 0)) + + # delete a message from queue + delete_msg("ExampleQueue", m$ReceiptHandle[1]) + + # delete queue + delete_queue("ExampleQueue") + +} +} +\references{ +\href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html}{SendMessage} +\href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html}{SendMessageBatch} +} +\seealso{ +\code{link{receive_msg}} \code{link{delete_msg}} +} +\author{ +Thomas J. Leeper +} diff --git a/man/sqsHTTP.Rd b/man/sqsHTTP.Rd new file mode 100644 index 0000000..b72342a --- /dev/null +++ b/man/sqsHTTP.Rd @@ -0,0 +1,51 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/aws_sqs_http.R +\name{sqsHTTP} +\alias{sqsHTTP} +\title{Execute SQS API Request} +\usage{ +sqsHTTP( + url = NULL, + headers = list(), + query = list(), + verbose = getOption("verbose", FALSE), + region = Sys.getenv("AWS_DEFAULT_REGION", "us-east-1"), + key = NULL, + secret = NULL, + session_token = NULL, + ... +) +} +\arguments{ +\item{url}{A character string containing an SQS API endpoint URL.} + +\item{headers}{A list of headers to pass to the HTTP request.} + +\item{query}{An optional named list containing query string parameters and their character values.} + +\item{verbose}{A logical indicating whether to be verbose. Default is given by \code{options("verbose")}.} + +\item{region}{A character string specifying an AWS region. See \code{\link[aws.signature]{locate_credentials}}.} + +\item{key}{A character string specifying an AWS Access Key. See \code{\link[aws.signature]{locate_credentials}}.} + +\item{secret}{A character string specifying an AWS Secret Key. See \code{\link[aws.signature]{locate_credentials}}.} + +\item{session_token}{Optionally, a character string specifying an AWS temporary Session Token to use in signing a request. See \code{\link[aws.signature]{locate_credentials}}.} + +\item{...}{Additional arguments passed to \code{\link[httr]{GET}}.} +} +\value{ +If successful, a named list. Otherwise, a data structure of class +\dQuote{aws_error} containing any error message(s) from AWS and information +about the request attempt. +} +\description{ +This is the workhorse function to execute calls to the SQS API. +} +\details{ +This function constructs and signs an SQS API request and returns the results thereof, or relevant debugging information in the case of error. +} +\author{ +Thomas J. Leeper +} From 2ebce4de24645c5afe995908b7f4886e0191e7ff Mon Sep 17 00:00:00 2001 From: Filip Schouwenaars Date: Wed, 6 Jul 2022 00:26:51 +0200 Subject: [PATCH 2/2] chore(local_testing): update receive_msg to make local testing work - When testing against localstack, the messages come in as an XML that has a different structure than the JSON from the production queues. Some hackery and cleanup was done to ensure both local and AWS queues work - In the process, R/main.R was refactored - Updated README.md with detailed instructions for local testing --- R/aws.R | 11 ++--- R/aws_sqs_attributes.R | 26 ++++++++++++ R/aws_sqs_http.R | 3 +- R/aws_sqs_messages.R | 24 +++++++---- R/main.R | 96 +++++++++++++++++++----------------------- README.md | 85 +++++++++++++++++++++++++------------ 6 files changed, 150 insertions(+), 95 deletions(-) diff --git a/R/aws.R b/R/aws.R index 441ac03..0c0eb34 100644 --- a/R/aws.R +++ b/R/aws.R @@ -37,12 +37,13 @@ copy_local <- function(local, path, dirname){ } } +send_msg_wrap <- function(queue, msg, type) { + info(paste(sprintf("Sending %s message to %s", type, queue), prettify(msg), sep="\n")) -post_job <- function(queue, json, value) { - info(sprintf("Posting %s job...", value)) - send_msg(queue, - msg = json, + send_msg(queue = queue, + msg = msg, query = list(MessageAttribute.1.Name = "type", MessageAttribute.1.Value.DataType ="String", - MessageAttribute.1.Value.StringValue = value)) + MessageAttribute.1.Value.StringValue = type)) } + diff --git a/R/aws_sqs_attributes.R b/R/aws_sqs_attributes.R index 6ca6e27..45f208a 100644 --- a/R/aws_sqs_attributes.R +++ b/R/aws_sqs_attributes.R @@ -1,3 +1,29 @@ +#' @title Get a queue URL +#' @aliases get_queue_url +#' @description Retrieves the URL for an SQS queue by its name. +#' @param name A character string containing the name of the queue. +#' @param owner A character string containing the AWS Account ID that created the queue. +#' @param query A list specifying additional query arguments to be passed to the \code{query} argument of \code{\link{sqsHTTP}}. +#' @param ... Additional arguments passed to \code{\link{sqsHTTP}}. +#' @return If successful, a character string containing an SQS Queue URL. Otherwise, a data structure of class \dQuote{aws_error} containing any error message(s) from AWS and information about the request attempt. +#' @author Thomas J. Leeper +#' @seealso \code{link{create_queue}} \code{link{delete_queue}} \code{\link{get_queue_attrs}} \code{\link{set_queue_attrs}} +#' @references +#' \href{http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueUrl.html}{GetQueueURL} +#' @export +get_queue_url <- function(name, owner = NULL, query = NULL, ...) { + query_args <- c(query, list(Action = "GetQueueUrl", QueueName = name)) + if (!is.null(owner)) { + query_args$QueueOwnerAWSAccountId <- owner + } + out <- sqsHTTP(query = query_args, ...) + if (inherits(out, "aws-error") || inherits(out, "unknown")) { + return(out) + } + structure(out$GetQueueUrlResponse$GetQueueUrlResult$QueueUrl, + RequestId = out$GetQueueUrlResponse$ResponseMetadata$RequestId) +} + .urlFromName <- function(queue) { p <- httr::parse_url(queue) if (is.null(p$scheme)) { diff --git a/R/aws_sqs_http.R b/R/aws_sqs_http.R index 49c56a7..09398a3 100644 --- a/R/aws_sqs_http.R +++ b/R/aws_sqs_http.R @@ -88,7 +88,7 @@ sqsHTTP <- attr(out, "request_string_to_sign") <- Sig$StringToSign attr(out, "request_signature") <- Sig$SignatureHeader } else { - out <- try(jsonlite::fromJSON(cont), silent = TRUE) + out <- try(jsonlite::fromJSON(cont, simplifyVector = FALSE), silent = TRUE) if (inherits(out, "try-error")) { out2 <- try(xml2::as_list(xml2::read_xml(cont)), silent = TRUE) if (inherits(out2, "try-error")) { @@ -96,7 +96,6 @@ sqsHTTP <- } else { out <- out2 } - } } return(out) diff --git a/R/aws_sqs_messages.R b/R/aws_sqs_messages.R index a969e8e..b15e641 100644 --- a/R/aws_sqs_messages.R +++ b/R/aws_sqs_messages.R @@ -64,17 +64,23 @@ receive_msg <- function(queue, attributes = NULL, n = 1, timeout = NULL, wait = return(out) } out2 <- out$ReceiveMessageResponse$ReceiveMessageResult + if (is.null(out2$messages) && !is.null(out2$Message)) { + messages <- list() + for (i in 1:length(out2$Message$MessageId)) { + messages[[i]] <- list( + MessageId = out2$Message$MessageId[[i]], + ReceiptHandle = out2$Message$ReceiptHandle[[i]], + MD5OfBody = out2$Message$MD5OfBody[[i]], + Body = out2$Message$Body[[i]] + ) + } + } else { + messages <- out2$messages + } if (!length(out2)) { - out2 <- data.frame(Attributes = character(0), - Body = character(0), - MD5OfBody = character(0), - MD5OfMessageAttributes = character(0), - MessageAttributes = character(0), - MessageId = character(0), - ReceiptHandle = character(0), - stringsAsFactors = FALSE) + messages <- list() } - structure(out2, + structure(messages, RequestId = out$ReceiveMessageResponse$ResponseMetadata$RequestId) } diff --git a/R/main.R b/R/main.R index e2c235c..dfe86db 100644 --- a/R/main.R +++ b/R/main.R @@ -9,66 +9,58 @@ main <- function() { source(".env.R") } - # names for the queues + # Get queue names from env variables from_queue <- Sys.getenv("SOURCE_QUEUE") - to_queue <- Sys.getenv("DEST_QUEUE") # rdoc-app-worker - error_queue <- Sys.getenv("DEADLETTER_QUEUE") # rdoc-r-worker-deadletter - - # # initialize the queues - # create_queue(from_queue) - # create_queue(to_queue) - # create_queue(error_queue) + to_queue <- Sys.getenv("DEST_QUEUE") + error_queue <- Sys.getenv("DEADLETTER_QUEUE") while(1) { info("Polling for messages on", from_queue, "...") messages <- receive_msg(from_queue, wait = 20) - print(messages) - if(nrow(messages) > 0) { - for (i in 1:nrow(messages)) { - delete_files() - message <- as.list(messages[i, ]) - info(paste("Received message:", prettify(message$Body), sep="\n")) - body <- fromJSON(message$Body) - repo_type <- body$repoType - if (is.null(repo_type)) { - repo_type <- 'cran' - } + for (message in messages) { + info(paste("Received message:", prettify(message$Body), sep="\n")) + body <- fromJSON(message$Body) + repo_type <- body$repoType + if (is.null(repo_type)) { + repo_type <- 'cran' + } - tm <- as.POSIXlt(Sys.time(), Sys.timezone(), "%Y-%m-%dT%H:%M:%S") - datetime <- format(tm , "%Y-%m-%dT%H:%M:%S%z") - result <- tryCatch({ - res <- process_package(body$path, body$name, repo_type) - res$description$jobInfo <- list(package = body$name, - version = body$version, - parsingStatus = "success", - parserVersion = parser_version, - parsedAt = datetime) - info("Putting description and topics on S3 ...") - dump_jsons_on_s3(res$description, res$topics) - post_job(to_queue, toJSON(res$description, auto_unbox = TRUE), "version") - post_job(to_queue, sapply(res$topics, toJSON, auto_unbox = TRUE), "topic") - }, - error = function(e) { - errorObject <- character(0) - errorObject$jobInfo <- list(message_body = body, - error = e$message, - parsingStatus = "failed", - parserVersion = parser_version, - parsedAt = datetime) + tm <- as.POSIXlt(Sys.time(), Sys.timezone(), "%Y-%m-%dT%H:%M:%S") + datetime <- format(tm , "%Y-%m-%dT%H:%M:%S%z") + result <- tryCatch({ + res <- process_package(body$path, body$name, repo_type) + res$description$jobInfo <- list(package = body$name, + version = body$version, + parsingStatus = "success", + parserVersion = parser_version, + parsedAt = datetime) + info("Putting description and topics on S3 ...") + # dump_jsons_on_s3(res$description, res$topics) + send_msg_wrap(to_queue, toJSON(res$description, auto_unbox = TRUE), "version") + for (topic in res$topics) { + send_msg_wrap(to_queue, toJSON(topic, auto_unbox = TRUE), "topic") + } + }, + error = function(e) { + errorObject <- character(0) + errorObject$jobInfo <- list(message_body = body, + error = e$message, + parsingStatus = "failed", + parserVersion = parser_version, + parsedAt = datetime) - error_json <- toJSON(errorObject, auto_unbox = TRUE) - message(prettify(error_json)) - info("Putting job in deadletter queue ...") - post_job(error_queue, error_json, "error") - }, finally = { - delete_files() - info("Deleting job from SQS ...") - delete_msg(from_queue, message$ReceiptHandle) - info("Garbage collection ...") - gc() - }) - } + error_json <- toJSON(errorObject, auto_unbox = TRUE) + message(prettify(error_json)) + info("Putting job in deadletter queue ...") + send_msg_wrap(error_queue, error_json, "error") + }, finally = { + delete_files() + info("Deleting job from SQS ...") + delete_msg(from_queue, message$ReceiptHandle) + info("Garbage collection ...") + gc() + }) } } } diff --git a/README.md b/README.md index 07f0ccc..38b9f8a 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,7 @@ We have forked our own version of `pkgdown` which we use here: https://github.co - Set an environment variable `GITHUB_PAT` - Install the package's dependencies: ```R - remotes::install_github("datacamp/pkgdown", ref = "master") - install.packages("aws.sqs", repos = c(getOption("repos"), "http://cloudyr.github.io/drat")) + devtools::install_github("datacamp/pkgdown", ref = "master") ``` - Open up `RPackageParser.RProj` in RStudio. - Select Build > Load All; this will make all exported and unexported functions of the package available. @@ -30,52 +29,84 @@ We have forked our own version of `pkgdown` which we use here: https://github.co ```R res <- process_package("https://cran.r-project.org/src/contrib/Archive/R6/R6_2.5.0.tar.gz", "R6", "cran") ``` + +### Test the parsing logic -### Polling and posting to SQS queues +If you just want to test the downloading, unpacking and parsing of a package and its topics: -First, add a file `.env.R` in the package root folder with info that AWS needs: +1. `devtools::load_all(".")` +2. `library("RPackageParser")` +3. `res <- process_package("https://cran.r-project.org/src/contrib/REdaS_0.9.4.tar.gz", "REdaS", "cran")`: replace these arguments with the ones of the package you want to test. +4. `write(jsonlite::toJSON(res$topics[[1]],auto_unbox = TRUE), file = 'topic.json')`: this will create a `topic.json` file in the root of the project that contains the JSON that will be added to the queue. This is what the API will process before adding the topic to the mysql database. -```R -Sys.setenv(AWS_ACCESS_KEY_ID = "ACCESS_KEY_ID", - AWS_SECRET_ACCESS_KEY = "SECRET_ACCESS_KEY", - AWS_DEFAULT_REGION = "us-east-1", - DEST_QUEUE = "rdoc-app-worker", - SOURCE_QUEUE = "rdoc-r-worker", - DEADLETTER_QUEUE = "rdoc-r-worker-deadletter") +### Setting up local SQS queues + +First, install LocalStack to locally set up SQS queues +``` +pip3 install --user localstack +pip3 install --user awscli-local +# Start the container +localstack start -d +# Create the local queues +awslocal sqs create-queue --queue-name rdoc-r-worker-local +awslocal sqs create-queue --queue-name rdoc-r-deadletter-local +awslocal sqs create-queue --queue-name rdoc-app-worker-local ``` -You need to add AWS keys that have write access to the SQS queues so that you can post messages to the queue. -You can find `AWS_ACCESS_KEY_ID` in the AWS Parameter Store, but `AWS_SECRET_ACCESS_KEY` will be encrypted there so you will need to request that value from the infra team. +add a file `.env.R` in the package root folder with dummy AWS credentials and the right queue names: -After that, you can run `main()`; this will poll the SQS queues and do all the processing: +```R +Sys.setenv( + AWS_ACCESS_KEY_ID ="AKI", + AWS_SECRET_ACCESS_KEY = "SAK", + AWS_DEFAULT_REGION = "us-east-1", + SOURCE_QUEUE = "http://localhost:4566/000000000000/rdoc-r-worker-local", + DEADLETTER_QUEUE = "http://localhost:4566/000000000000/rdoc-r-worker-deadletter-local", + DEST_QUEUE = "http://localhost:4566/000000000000/rdoc-app-worker-local" +) +``` + +Now you can run the `main()` function: ```R RPackageParser::main() ``` -### Add messages to the queue - -If you want to add messages to the queue for local testing, setup the aws cli and then run: +Finally, you can add messages to the queue, e.g.: ``` -aws sqs send-message --queue-url https://queue.amazonaws.com/301258414863/rdoc-r-worker --message-body '{"name":"ReorderCluster","version":"1.0","path":"ftp://cran.r-project.org/pub/R/src/contrib/ReorderCluster_1.0.tar.gz"}' +awslocal sqs send-message --queue-url http://localhost:4566/000000000000/rdoc-r-worker-local--message-body '{"name":"sm","version":"2.2-5.7","path":"ftp://cran.r-project.org/pub/R/src/contrib/sm_2.2-5.7.tar.gz"}' ``` -where you replace the body with the package that you want to test. +The `main()` function will ingest new messages, run `process_packages()`, and posts the parsed JSONs on the destination queue. -Note that this is the production queue, which means that the queue will be processed both by your local parser and the production parser, and whoever pics the message first will be the one to process it. That's why you might need to send a few requests until your local parser can pick the message. +## Connecting to staging/production AWS queues -After you added your message to the [rdoc-r-worker queue](https://us-east-1.console.aws.amazon.com/sqs/v2/home?region=us-east-1#/queues/https%3A%2F%2Fsqs.us-east-1.amazonaws.com%2F301258414863%2Frdoc-r-worker/send-receive), you should see it for a brief moment in AWS while its being processed. After the processing is done, you should be able to see new messages in [rdoc-app-worker queue](https://us-east-1.console.aws.amazon.com/sqs/v2/home?region=us-east-1#/queues/https%3A%2F%2Fsqs.us-east-1.amazonaws.com%2F301258414863%2Frdoc-app-worker/send-receive#/) (click on the "Poll for messages" button in the aws console). +Although this is not advised, you can also have a locally running `main()` function connect to staging and production queues on AWS. -### Testing locally without SQS queues +To do so, you need to add AWS keys that have write access to the SQS queues so that you can post messages to the queue. Ask Filip, Zaurbek or the infrastructure team for valid `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` credentials and update your `.env.R` file: -If you just want to test pulling a package and generating the output that will be added to the destination queue, just open this project in RStudio and run these commands in the console: +```R +Sys.setenv( + AWS_ACCESS_KEY_ID ="SECRET", + AWS_SECRET_ACCESS_KEY = "SECRET", + AWS_DEFAULT_REGION = "us-east-1", + SOURCE_QUEUE = "rdoc-r-worker", + DEST_QUEUE = "rdoc-app-worker", + DEADLETTER_QUEUE = "rdoc-r-worker-deadletter" +) +``` -1. `devtools::load_all(".")` -2. `library("RPackageParser")` -3. `res <- process_package("https://cran.r-project.org/src/contrib/REdaS_0.9.4.tar.gz", "REdaS", "cran")`: replace these arguments with the ones of the package you want to test. -4. `write(jsonlite::toJSON(res$topics[[1]],auto_unbox = TRUE), file = 'topic.json')`: this will create a `topic.json` file in the root of the project that contains the JSON that will be added to the queue. This is what the API will process before adding the topic to the mysql database. +To add a message to e.g. the production queue: + +``` +aws sqs send-message --queue-url https://queue.amazonaws.com/301258414863/rdoc-r-worker --message-body '{"name":"ReorderCluster","version":"1.0","path":"ftp://cran.r-project.org/pub/R/src/contrib/ReorderCluster_1.0.tar.gz"}' +``` + +Note that this is the production queue, which means that the queue will be processed both by your local parser and the production parser, and whoever pics the message first will be the one to process it. That's why you might need to send a few requests until your local parser can pick the message. + +After you added your message to the [rdoc-r-worker queue](https://us-east-1.console.aws.amazon.com/sqs/v2/home?region=us-east-1#/queues/https%3A%2F%2Fsqs.us-east-1.amazonaws.com%2F301258414863%2Frdoc-r-worker/send-receive), you should see it for a brief moment in AWS while its being processed. After the processing is done, you should be able to see new messages in [rdoc-app-worker queue](https://us-east-1.console.aws.amazon.com/sqs/v2/home?region=us-east-1#/queues/https%3A%2F%2Fsqs.us-east-1.amazonaws.com%2F301258414863%2Frdoc-app-worker/send-receive#/) (click on the "Poll for messages" button in the aws console). ## Deployment