Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sts to construct stream arn #463

Merged
merged 3 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ set(STATIC_LIBS
boost_chrono)

find_package(Threads)
find_package(AWSSDK REQUIRED COMPONENTS kinesis monitoring)
find_package(AWSSDK REQUIRED COMPONENTS kinesis monitoring sts)

add_library(LibCrypto STATIC IMPORTED)
set_property(TARGET LibCrypto PROPERTY IMPORTED_LOCATION ${THIRD_PARTY_LIB_DIR}/libcrypto.a)
Expand Down
40 changes: 38 additions & 2 deletions aws/kinesis/core/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <boost/format.hpp>
#include <iomanip>

#include <aws/core/utils/ARN.h>
#include <aws/core/utils/StringUtils.h>
#include <aws/kinesis/core/aggregator.h>
#include <aws/kinesis/core/collector.h>
#include <aws/kinesis/core/configuration.h>
Expand All @@ -29,6 +31,12 @@
#include <aws/kinesis/KinesisClient.h>
#include <aws/metrics/metrics_manager.h>
#include <aws/utils/processing_statistics_logger.h>
#include <aws/sts/STSClient.h>
#include <aws/sts/model/GetCallerIdentityRequest.h>
#include <aws/sts/model/GetCallerIdentityResult.h>

#include <aws/utils/logging.h>


namespace aws {
namespace kinesis {
Expand All @@ -49,6 +57,7 @@ class Pipeline : boost::noncopyable {
Retrier::UserRecordCallback finish_user_record_cb)
: stream_(std::move(stream)),
region_(std::move(region)),
stream_arn_(std::move(init_stream_arn(region_, stream_))),
config_(std::move(config)),
stats_logger_(stream_, config_->record_max_buffered_time()),
executor_(std::move(executor)),
Expand All @@ -60,6 +69,7 @@ class Pipeline : boost::noncopyable {
executor_,
kinesis_client_,
stream_,
stream_arn_,
metrics_manager_)),
aggregator_(
std::make_shared<Aggregator>(
Expand Down Expand Up @@ -151,7 +161,7 @@ class Pipeline : boost::noncopyable {
}

void send_put_records_request(const std::shared_ptr<PutRecordsRequest>& prr) {
auto prc = std::make_shared<PutRecordsContext>(stream_, prr->items());
auto prc = std::make_shared<PutRecordsContext>(stream_, stream_arn_, prr->items());
prc->set_start(std::chrono::steady_clock::now());
kinesis_client_->PutRecordsAsync(
prc->to_sdk_request(),
Expand Down Expand Up @@ -190,8 +200,34 @@ class Pipeline : boost::noncopyable {
});
}

std::string stream_;
// Retrieve the account ID and partition from the STS service.
static std::string init_stream_arn(const std::string &region, const std::string &stream_name) {
Aws::STS::STSClient sts;
Aws::STS::Model::GetCallerIdentityRequest request;
auto outcome = sts.GetCallerIdentity(request);
if (outcome.IsSuccess()) {
auto result = outcome.GetResult();
Aws::Utils::ARN sts_arn(result.GetArn());

// Construct and return the Kinesis stream ARN.
std::stringstream arn;
arn << "arn:" << sts_arn.GetPartition() << ":kinesis:" << region << ":" << result.GetAccount()
<< ":stream/" << stream_name;
Comment on lines +217 to +218
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this stream arn format same for all regions? I thought we had some issues with arn format in some regions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good call out, we once had a problem with pod1, but I think the format is good for all public commercial regions

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, we should still test to make sure this works in all partitions.


auto arn_str = arn.str();
LOG(info) << "StreamARN \"" << arn_str << "\" has been successfully configured, "
<< "and will be used in requests including ListShards and PutRecords";
return arn_str;
}

LOG(warning) << "Failed to get StreamARN using STS GetCallerIdentity with exception: "

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should throw if this fails.

that will avoid operating in dual-mode, and have all applications start using ARN from this version onwards.

<< outcome.GetError().GetMessage().c_str();
return {};
}

std::string region_;
std::string stream_;
std::string stream_arn_;
std::shared_ptr<Configuration> config_;
aws::utils::processing_statistics_logger stats_logger_;
std::shared_ptr<aws::utils::Executor> executor_;
Expand Down
10 changes: 9 additions & 1 deletion aws/kinesis/core/put_records_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,20 @@ namespace core {
class PutRecordsContext : public Aws::Client::AsyncCallerContext {
public:
PutRecordsContext(std::string stream,
std::string stream_arn,
std::vector<std::shared_ptr<KinesisRecord>> records)
: stream_(stream),
: stream_(std::move(stream)),
stream_arn_(std::move(stream_arn)),
records_(std::move(records)) {}

const std::string& get_stream() const {
return stream_;
}

const std::string& get_stream_arn() const {
return stream_arn_;
}

std::chrono::steady_clock::time_point get_start() const {
return start_;
}
Expand Down Expand Up @@ -76,6 +82,7 @@ class PutRecordsContext : public Aws::Client::AsyncCallerContext {
req.AddRecords(std::move(e));
}
req.SetStreamName(stream_);
if (!stream_arn_.empty()) req.SetStreamARN(stream_arn_);
return req;
}

Expand All @@ -96,6 +103,7 @@ class PutRecordsContext : public Aws::Client::AsyncCallerContext {

private:
std::string stream_;
std::string stream_arn_;
std::chrono::steady_clock::time_point start_;
std::chrono::steady_clock::time_point end_;
std::vector<std::shared_ptr<KinesisRecord>> records_;
Expand Down
1 change: 1 addition & 0 deletions aws/kinesis/core/retrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class MetricsPutter {
private:
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager_;
std::string stream_;
std::string stream_arn_;
};

} // namespace detail
Expand Down
15 changes: 10 additions & 5 deletions aws/kinesis/core/shard_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ ShardMap::ShardMap(
std::shared_ptr<aws::utils::Executor> executor,
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
std::string stream,
std::string stream_arn,
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager,
std::chrono::milliseconds min_backoff,
std::chrono::milliseconds max_backoff)
: executor_(std::move(executor)),
kinesis_client_(std::move(kinesis_client)),
stream_(std::move(stream)),
stream_arn_(std::move(stream_arn)),
metrics_manager_(std::move(metrics_manager)),
state_(INVALID),
min_backoff_(min_backoff),
Expand Down Expand Up @@ -101,8 +103,10 @@ void ShardMap::list_shards(const Aws::String& next_token) {

if (!next_token.empty()) {
req.SetNextToken(next_token);
if (!stream_arn_.empty()) req.SetStreamARN(stream_arn_);
} else {
req.SetStreamName(stream_);
if (!stream_arn_.empty()) req.SetStreamARN(stream_arn_);
Aws::Kinesis::Model::ShardFilter shardFilter;
shardFilter.SetType(Aws::Kinesis::Model::ShardFilterType::AT_LATEST);
req.SetShardFilter(shardFilter);
Expand Down Expand Up @@ -146,13 +150,14 @@ void ShardMap::list_shards_callback(
updated_at_ = std::chrono::steady_clock::now();

LOG(info) << "Successfully updated shard map for stream \""
<< stream_ << "\" found " << end_hash_key_to_shard_id_.size()
<< " shards";
<< stream_ << (stream_arn_.empty() ? "\"" : "\" (arn: \"" + stream_arn_ + "\")")
<< ". Found " << end_hash_key_to_shard_id_.size() << " shards";
}

void ShardMap::update_fail(const std::string& code, const std::string& msg) {
LOG(error) << "Shard map update for stream \"" << stream_ << "\" failed. "
<< "Code: " << code << " Message: " << msg << "; retrying in "
void ShardMap::update_fail(const std::string &code, const std::string &msg) {
LOG(error) << "Shard map update for stream \""
<< stream_ << (stream_arn_.empty() ? "\"" : "\" (arn: \"" + stream_arn_ + "\")")
<< "failed. Code: " << code << " Message: " << msg << "; retrying in "
<< backoff_.count() << " ms";

WriteLock lock(mutex_);
Expand Down
2 changes: 2 additions & 0 deletions aws/kinesis/core/shard_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ShardMap : boost::noncopyable {
ShardMap(std::shared_ptr<aws::utils::Executor> executor,
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
std::string stream,
std::string stream_arn,
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager
= std::make_shared<aws::metrics::NullMetricsManager>(),
std::chrono::milliseconds min_backoff = kMinBackoff,
Expand Down Expand Up @@ -88,6 +89,7 @@ class ShardMap : boost::noncopyable {
std::shared_ptr<aws::utils::Executor> executor_;
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client_;
std::string stream_;
std::string stream_arn_;
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager_;

State state_;
Expand Down
1 change: 1 addition & 0 deletions aws/kinesis/core/test/retrier_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ auto make_prr_ctx(size_t num_kr,
}
auto ctx = std::make_shared<aws::kinesis::core::PutRecordsContext>(
"myStream",
"arn:aws:kinesis:us-east-2:123456789012:stream/myStream",
krs);
ctx->set_outcome(outcome);
return ctx;
Expand Down
2 changes: 2 additions & 0 deletions aws/kinesis/core/test/shard_map_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
namespace {

const std::string kStreamName = "myStream";
const std::string kStreamARN = "arn:aws:kinesis:us-east-2:123456789012:stream/myStream";

Aws::Client::ClientConfiguration fake_client_cfg() {
Aws::Client::ClientConfiguration cfg;
Expand Down Expand Up @@ -88,6 +89,7 @@ class Wrapper {
outcomes_list_shards,
[this] { num_req_received_++; }),
kStreamName,
kStreamARN,
std::make_shared<aws::metrics::NullMetricsManager>(),
std::chrono::milliseconds(100),
std::chrono::milliseconds(1000));
Expand Down
21 changes: 8 additions & 13 deletions bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@ silence() {
fi
}

OPENSSL_VERSION="1.1.1q"
OPENSSL_VERSION="1.1.1s"
BOOST_VERSION="1.80.0"
BOOST_VERSION_UNDERSCORED="${BOOST_VERSION//\./_}" # convert from 1.80.0 to 1_80_0
ZLIB_VERSION="1.2.12"
PROTOBUF_VERSION="3.11.4"
CURL_VERSION="7.85.0"
AWS_SDK_CPP_VERSION="1.9.67"

# 1.1.1.q is not stable for MacOS, downgraded to 1.1.1n
# see https://github.com/openssl/openssl/issues/18720
if [[ $(uname) == 'Darwin' ]]; then
OPENSSL_VERSION="1.1.1n"
fi
CURL_VERSION="7.86.0"
AWS_SDK_CPP_VERSION="1.10.32"

LIB_OPENSSL="https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz"
LIB_BOOST="https://boostorg.jfrog.io/artifactory/main/release/${BOOST_VERSION}/source/boost_${BOOST_VERSION_UNDERSCORED}.tar.gz"
Expand Down Expand Up @@ -222,9 +216,10 @@ if [ ! -d "curl-${CURL_VERSION}" ]; then
cd curl-${CURL_VERSION}

if [[ $(uname) == 'Darwin' ]]; then
silence conf --disable-shared --disable-ldap --disable-ldaps --without-libidn2 \
--enable-threaded-resolver --disable-debug --without-libssh2 --without-ca-bundle --with-openssl

silence conf --with-openssl --enable-threaded-resolver \
--disable-shared --disable-ldap --disable-ldaps --disable-debug \
--without-libidn2 --without-libssh2 --without-ca-bundle \
--without-brotli --without-nghttp2 --without-librtmp --without-zstd
# Apply a patch for macOS that should prevent curl from trying to use clock_gettime
# This is a temporary work around for https://github.com/awslabs/amazon-kinesis-producer/issues/117
# until dependencies are updated
Expand Down Expand Up @@ -255,7 +250,7 @@ if [ ! -d "aws-sdk-cpp" ]; then
cd aws-sdk-cpp-build

silence $CMAKE \
-DBUILD_ONLY="kinesis;monitoring" \
-DBUILD_ONLY="kinesis;monitoring;sts" \
-DCMAKE_BUILD_TYPE=RelWithDebInfo \
-DSTATIC_LINKING=1 \
-DCMAKE_PREFIX_PATH="$INSTALL_DIR" \
Expand Down
2 changes: 1 addition & 1 deletion java/amazon-kinesis-producer-sample/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.14.13</version>
<version>0.15.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
5 changes: 3 additions & 2 deletions java/amazon-kinesis-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.14.13</version>
<version>0.15.0-SNAPSHOT</version>
<name>Amazon Kinesis Producer Library</name>

<scm>
Expand Down Expand Up @@ -76,7 +76,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.5</version>
<version>3.21.7</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down Expand Up @@ -113,6 +113,7 @@
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty-no-dependencies</artifactId>
<version>5.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.xml.bind.DatatypeConverter;
import jakarta.xml.bind.DatatypeConverter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public final class GlueSchemaRegistrySerializerInstance {

private volatile GlueSchemaRegistrySerializer instance = null;
private static final String USER_AGENT_APP_NAME = "kpl-0.14.13";
private static final String USER_AGENT_APP_NAME = "kpl-0.15.0-SNAPSHOT";

/**
* Instantiate GlueSchemaRegistrySerializer using the KinesisProducerConfiguration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.security.MessageDigest;
import java.util.Arrays;

import javax.xml.bind.DatatypeConverter;
import jakarta.xml.bind.DatatypeConverter;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord) {
* The hash value used to explicitly determine the shard the data
* record is assigned to by overriding the partition key hash.
* Must be a valid string representation of a positive integer
* with value between 0 and <tt>2^128 - 1</tt> (inclusive).
* with value between 0 and <code>2^128 - 1</code> (inclusive).
* @param data
* Binary data of the record. Maximum size 1MiB.
* @return A future for the result of the put.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ public class UserRecord {
*/
private String streamName;

/**
* ARN of the stream, e.g., arn:aws:kinesis:us-east-2:123456789012:stream/mystream
*/
private String streamARN;

/**
* Partition key. Length must be at least one, and at most 256 (inclusive).
*/
Expand All @@ -19,7 +24,7 @@ public class UserRecord {
* The hash value used to explicitly determine the shard the data
* record is assigned to by overriding the partition key hash.
* Must be a valid string representation of a positive integer
* with value between 0 and <tt>2^128 - 1</tt> (inclusive).
* with value between 0 and <code>2^128 - 1</code> (inclusive).
*/
private String explicitHashKey;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.security.DigestInputStream;
import java.security.MessageDigest;

import javax.xml.bind.DatatypeConverter;
import jakarta.xml.bind.DatatypeConverter;

import org.apache.commons.io.IOUtils;
import org.junit.After;
Expand Down