diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0abf16ac..c5df3398 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## 2.18.0 (2022-01-12)
+### Changed
+- Upgraded Sarama library to 1.30.1 in order to support Kafka latest versions (up to 3.0)
+- Added consumer-offset integration test
+
## 2.17.0 (2021-06-27)
### Changed
- Moved default config.sample to [V4](https://docs.newrelic.com/docs/create-integrations/infrastructure-integrations-sdk/specifications/host-integrations-newer-configuration-format/), added a dependency for infra-agent version 1.20.0
diff --git a/go.mod b/go.mod
index ec5fb7c0..df8233bb 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/newrelic/nri-kafka
go 1.16
require (
- github.com/Shopify/sarama v1.27.0
+ github.com/Shopify/sarama v1.30.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.2.1-0.20190716143035-b98ce2825f72 // indirect
github.com/eapache/queue v1.1.1-0.20180227141424-093482f3f8ce // indirect
diff --git a/go.sum b/go.sum
index abd73ffa..8c38bf65 100644
--- a/go.sum
+++ b/go.sum
@@ -69,10 +69,10 @@ github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuN
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us=
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
-github.com/Shopify/sarama v1.27.0 h1:tqo2zmyzPf1+gwTTwhI6W+EXDw4PVSczynpHKFtVAmo=
-github.com/Shopify/sarama v1.27.0/go.mod h1:aCdj6ymI8uyPEux1JJ9gcaDT6cinjGhNCAhs54taSUo=
-github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
-github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
+github.com/Shopify/sarama v1.30.1 h1:z47lP/5PBw2UVKf1lvfS5uWXaJws6ggk9PLnKEHtZiQ=
+github.com/Shopify/sarama v1.30.1/go.mod h1:hGgx05L/DiW8XYBXeJdKIN6V2QUy2H6JqME5VT1NLRw=
+github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae h1:ePgznFqEG1v3AjMklnK8H7BSc++FDSo7xfK9K7Af+0Y=
+github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae/go.mod h1:/cvHQkZ1fst0EmZnA5dFtiQdWCNCFYzb+uE2vqVgvx0=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -187,8 +187,8 @@ github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
-github.com/frankban/quicktest v1.10.0 h1:Gfh+GAJZOAoKZsIZeZbdn2JF10kN1XHNvjsvQK8gVkE=
-github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y=
+github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
+github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
@@ -277,9 +277,9 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
-github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
-github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2 h1:23T5iq8rbUYlhpt5DB4XJkc6BU31uODLD1o1gKvZmD0=
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2/go.mod h1:k9Qvh+8juN+UKMCS/3jFtGICgW8O96FVaZsaxdzDkR4=
github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a h1:w8hkcTqaFpzKqonE9uMCefW1WDie15eSP/4MssdenaM=
@@ -354,6 +354,8 @@ github.com/gordonklaus/ineffassign v0.0.0-20210225214923-2e10b2664254 h1:Nb2aRlC
github.com/gordonklaus/ineffassign v0.0.0-20210225214923-2e10b2664254/go.mod h1:M9mZEtGIsR1oDaZagNPNG9iq9n2HrhZ17dsXk73V3Lw=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
+github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gostaticanalysis/analysisutil v0.0.0-20190318220348-4088753ea4d3/go.mod h1:eEOZF4jCKGi+aprrirO9e7WKB3beBRtWgqGunKl6pKE=
@@ -426,8 +428,18 @@ github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
+github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
+github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
+github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
+github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
+github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
+github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA=
+github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
+github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
+github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jgautheron/goconst v1.5.1 h1:HxVbL1MhydKs8R8n/HE5NPvzfaYmQJA3o879lE4+WcM=
github.com/jgautheron/goconst v1.5.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4=
github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4=
@@ -461,18 +473,19 @@ github.com/kisielk/errcheck v1.6.0 h1:YTDO4pNy7AUN/021p+JGHycQyYNIyMoenM1YDVK6Rl
github.com/kisielk/errcheck v1.6.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
-github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
+github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
-github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
+github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -574,7 +587,6 @@ github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 h1:4kuARK6Y6Fx
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354/go.mod h1:KSVJerMDfblTH7p5MZaTt+8zaT2iEk3AkVb9PQdZuE8=
github.com/newrelic/infra-integrations-sdk v3.6.8+incompatible h1:EBFBmBxNXpsTbDb5qpMC9QahgQrBWc+El+1JqwQVk24=
github.com/newrelic/infra-integrations-sdk v3.6.8+incompatible/go.mod h1:tMUHRMq6mJS0YyBnbWrTXAnREnQqC1AGO6Lu45u5xAM=
-github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nishanths/exhaustive v0.2.3 h1:+ANTMqRNrqwInnP9aszg/0jDo+zbXa4x66U19Bx/oTk=
github.com/nishanths/exhaustive v0.2.3/go.mod h1:bhIX678Nx8inLM9PbpvK1yv6oGtoP8BfaIeMzgBNKvc=
@@ -614,8 +626,8 @@ github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d h1:CdDQnGF8Nq9ocOS/xlSptM1N3BbrA6/kmaep5ggwaIA=
github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d/go.mod h1:3OzsM7FXDQlpCiw2j81fOmAwQLnZnLGXVKUzeKQXIAw=
-github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
-github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
+github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -658,8 +670,8 @@ github.com/quasilyte/go-ruleguard/rules v0.0.0-20201231183845-9e62ed36efe1/go.mo
github.com/quasilyte/go-ruleguard/rules v0.0.0-20210428214800-545e0d2e0bf7/go.mod h1:4cgAphtvu7Ftv7vOT2ZOYhC6CvBxZixcasr8qIOTA50=
github.com/quasilyte/regex/syntax v0.0.0-20200407221936-30656e2c4a95 h1:L8QM9bvf68pVdQ3bCFZMDmnt9yqcMBro1pC7F+IPYMY=
github.com/quasilyte/regex/syntax v0.0.0-20200407221936-30656e2c4a95/go.mod h1:rlzQ04UMyJXu/aOvhd8qT+hvDrFpiwqp8MRXDY9szc0=
-github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
-github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
+github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@@ -737,7 +749,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
-github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@@ -772,6 +783,7 @@ github.com/ultraware/whitespace v0.0.4 h1:If7Va4cM03mpgrNH9k49/VOicWpGoG70XPBFFO
github.com/ultraware/whitespace v0.0.4/go.mod h1:aVMh/gQve5Maj9hQ/hg+F75lr/X5A89uZnzAmWSineA=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
+github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/uudashr/gocognit v1.0.5 h1:rrSex7oHr3/pPLQ0xoWq108XMU8s678FJcQ+aSfOHa4=
github.com/uudashr/gocognit v1.0.5/go.mod h1:wgYz0mitoKOTysqxTDMOUXg+Jb5SvtihkfmugIZYpEA=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
@@ -779,9 +791,11 @@ github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD
github.com/valyala/quicktemplate v1.7.0/go.mod h1:sqKJnoaOF88V07vkO+9FL8fb9uZg/VPSJnLYn+LmLk8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
+github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
-github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc h1:vIp1tjhVogU0yBy7w96P027ewvNPeH6gzuNcoc+NReU=
github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
@@ -841,11 +855,12 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
-golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.0.0-20210920023735-84f357641f63/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -920,7 +935,6 @@ golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
-golang.org/x/net v0.0.0-20200528225125-3c3fba18258b/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -935,8 +949,9 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ=
+golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -1310,8 +1325,9 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/cheggaaa/pb.v1 v1.0.28/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
@@ -1320,16 +1336,6 @@ gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c=
gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
-gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
-gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
-gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
-gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
-gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI=
-gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
-gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg=
-gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
-gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
-gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
@@ -1346,7 +1352,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0-20200601152816-913338de1bd2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/src/connection/connection.go b/src/connection/connection.go
index 4f9652e9..422e53c0 100644
--- a/src/connection/connection.go
+++ b/src/connection/connection.go
@@ -1,4 +1,5 @@
-//go:generate mockery --name=Client --name=SaramaBroker
+//go:generate mockery --name=Client
+//go:generate mockery --name=SaramaBroker
// Package connection implements connection code
package connection
@@ -23,24 +24,7 @@ import (
// Client is a wrapper around sarama.Client so that we can generate mocks
// See sarama.Client for documentation
type Client interface {
- Config() *sarama.Config
- Controller() (*sarama.Broker, error)
- Brokers() []*sarama.Broker
- Topics() ([]string, error)
- Partitions(topic string) ([]int32, error)
- WritablePartitions(topic string) ([]int32, error)
- Leader(topic string, partitionID int32) (*sarama.Broker, error)
- Replicas(topic string, partitionID int32) ([]int32, error)
- InSyncReplicas(topic string, partitionID int32) ([]int32, error)
- OfflineReplicas(topic string, partitionID int32) ([]int32, error)
- RefreshMetadata(topics ...string) error
- GetOffset(topic string, partitionID int32, time int64) (int64, error)
- Coordinator(consumerGroup string) (*sarama.Broker, error)
- RefreshCoordinator(consumerGroup string) error
- RefreshController() (*sarama.Broker, error)
- InitProducerID() (*sarama.InitProducerIDResponse, error)
- Close() error
- Closed() bool
+ sarama.Client
}
// SaramaBroker is an interface over sarama.Broker for mocking
diff --git a/src/connection/mocks/Client.go b/src/connection/mocks/Client.go
index 168e7ad4..0c02f554 100644
--- a/src/connection/mocks/Client.go
+++ b/src/connection/mocks/Client.go
@@ -1,15 +1,40 @@
-// Code generated by mockery v1.0.0. DO NOT EDIT.
+// Code generated by mockery v2.9.4. DO NOT EDIT.
package mocks
-import mock "github.com/stretchr/testify/mock"
-import sarama "github.com/Shopify/sarama"
+import (
+ sarama "github.com/Shopify/sarama"
+ mock "github.com/stretchr/testify/mock"
+)
// Client is an autogenerated mock type for the Client type
type Client struct {
mock.Mock
}
+// Broker provides a mock function with given fields: brokerID
+func (_m *Client) Broker(brokerID int32) (*sarama.Broker, error) {
+ ret := _m.Called(brokerID)
+
+ var r0 *sarama.Broker
+ if rf, ok := ret.Get(0).(func(int32) *sarama.Broker); ok {
+ r0 = rf(brokerID)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*sarama.Broker)
+ }
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(int32) error); ok {
+ r1 = rf(brokerID)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
// Brokers provides a mock function with given fields:
func (_m *Client) Brokers() []*sarama.Broker {
ret := _m.Called()
@@ -252,13 +277,13 @@ func (_m *Client) Partitions(topic string) ([]int32, error) {
return r0, r1
}
-// RefreshCoordinator provides a mock function with given fields: consumerGroup
-func (_m *Client) RefreshCoordinator(consumerGroup string) error {
- ret := _m.Called(consumerGroup)
+// RefreshBrokers provides a mock function with given fields: addrs
+func (_m *Client) RefreshBrokers(addrs []string) error {
+ ret := _m.Called(addrs)
var r0 error
- if rf, ok := ret.Get(0).(func(string) error); ok {
- r0 = rf(consumerGroup)
+ if rf, ok := ret.Get(0).(func([]string) error); ok {
+ r0 = rf(addrs)
} else {
r0 = ret.Error(0)
}
@@ -266,7 +291,7 @@ func (_m *Client) RefreshCoordinator(consumerGroup string) error {
return r0
}
-// RefreshController provides a mock function with given fields
+// RefreshController provides a mock function with given fields:
func (_m *Client) RefreshController() (*sarama.Broker, error) {
ret := _m.Called()
@@ -289,6 +314,20 @@ func (_m *Client) RefreshController() (*sarama.Broker, error) {
return r0, r1
}
+// RefreshCoordinator provides a mock function with given fields: consumerGroup
+func (_m *Client) RefreshCoordinator(consumerGroup string) error {
+ ret := _m.Called(consumerGroup)
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(string) error); ok {
+ r0 = rf(consumerGroup)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
// RefreshMetadata provides a mock function with given fields: topics
func (_m *Client) RefreshMetadata(topics ...string) error {
_va := make([]interface{}, len(topics))
diff --git a/src/connection/mocks/SaramaBroker.go b/src/connection/mocks/SaramaBroker.go
index 12298488..fdd636bf 100644
--- a/src/connection/mocks/SaramaBroker.go
+++ b/src/connection/mocks/SaramaBroker.go
@@ -1,4 +1,4 @@
-// Code generated by mockery 2.7.4. DO NOT EDIT.
+// Code generated by mockery v2.9.4. DO NOT EDIT.
package mocks
diff --git a/tests/integration/consumer-producer/Dockerfile b/tests/integration/consumer-producer/Dockerfile
new file mode 100644
index 00000000..3991bcce
--- /dev/null
+++ b/tests/integration/consumer-producer/Dockerfile
@@ -0,0 +1,8 @@
+FROM maven
+COPY src src
+COPY pom.xml pom.xml
+
+RUN mvn clean compile package
+
+WORKDIR /
+ENTRYPOINT ["java","-Dcom.sun.management.jmxremote","-Djava.rmi.server.hostname=localhost","-Dcom.sun.management.jmxremote.port=1088","-Dcom.sun.management.jmxremote.rmi.port=1088","-Dcom.sun.management.jmxremote.local.only=false","-Dcom.sun.management.jmxremote.authenticate=false","-Dcom.sun.management.jmxremote.ssl=false","-jar","./target/kafka-dummy-1.0-jar-with-dependencies.jar"]
diff --git a/tests/integration/consumer-producer/pom.xml b/tests/integration/consumer-producer/pom.xml
new file mode 100644
index 00000000..fef0883e
--- /dev/null
+++ b/tests/integration/consumer-producer/pom.xml
@@ -0,0 +1,67 @@
+
+
+ 4.0.0
+ 1
+ kafka-dummy
+ 1.0
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.10.0.0
+
+
+ org.apache.kafka
+ kafka-streams
+ 0.10.0.0
+
+
+ org.apache.flink
+ flink-connector-kafka-0.10_2.11
+ 1.4.0
+
+
+ org.apache.flink
+ flink-streaming-java_2.11
+ 1.4.0
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ kafkaDummy.Factory
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 1.8
+
+
+
+
+
diff --git a/tests/integration/consumer-producer/src/main/java/Factory.java b/tests/integration/consumer-producer/src/main/java/Factory.java
new file mode 100644
index 00000000..77deb887
--- /dev/null
+++ b/tests/integration/consumer-producer/src/main/java/Factory.java
@@ -0,0 +1,13 @@
+package kafkaDummy;
+
+public class Factory {
+ public static void main(String[] args) {
+ if (args[0].equals("producer")) {
+ SimpleProducer.main(args);
+ } else if (args[0].equals("consumer")) {
+ SimpleConsumer.main(args);
+ } else {
+ System.out.println("First argument should be consumer/producer" + args[0]);
+ }
+ }
+}
diff --git a/tests/integration/consumer-producer/src/main/java/SimpleConsumer.java b/tests/integration/consumer-producer/src/main/java/SimpleConsumer.java
new file mode 100644
index 00000000..6bd1b787
--- /dev/null
+++ b/tests/integration/consumer-producer/src/main/java/SimpleConsumer.java
@@ -0,0 +1,43 @@
+package kafkaDummy;
+
+import java.util.Properties;
+import java.util.Arrays;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class SimpleConsumer {
+ public static void main(String[] args) {
+ if(args.length < 4){
+ System.out.println("Usage: consumer ");
+ return;
+ }
+
+ String bootstrapBroker = args[1].toString();
+ String topic = args[2].toString();
+ String group = args[3].toString();
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapBroker);
+ props.put("group.id", group);
+ props.put("enable.auto.commit", "true");
+ props.put("auto.commit.interval.ms", "1000");
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer",
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer",
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
+ KafkaConsumer consumer = new KafkaConsumer(props);
+
+ consumer.subscribe(Arrays.asList(topic));
+ System.out.println("Subscribed to topic " + topic);
+
+ while (true) {
+ ConsumerRecords records = consumer.poll(10);
+ for (ConsumerRecord record : records) {
+ System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
+ }
+ }
+ }
+}
diff --git a/tests/integration/consumer-producer/src/main/java/SimpleProducer.java b/tests/integration/consumer-producer/src/main/java/SimpleProducer.java
new file mode 100644
index 00000000..adc3f61f
--- /dev/null
+++ b/tests/integration/consumer-producer/src/main/java/SimpleProducer.java
@@ -0,0 +1,57 @@
+package kafkaDummy;
+
+import java.util.Properties;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public class SimpleProducer {
+ public static void main(String[] args) {
+ if(args.length < 3){
+ System.out.println("Usage: producer ");
+ return;
+ }
+
+ String bootstrapBroker = args[1].toString();
+ String topicName = args[2].toString();
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapBroker);
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 16384);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer",
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer",
+ "org.apache.kafka.common.serialization.StringSerializer");
+
+ Producer producer = new KafkaProducer
+ (props);
+
+ int msg = 0;
+ while(true) {
+ producer.send(new ProducerRecord(topicName,
+ Integer.toString(msg), Integer.toString(msg)));
+ System.out.println("Message sent successfully");
+ msg++;
+ wait(2000);
+ }
+
+ // never reached
+ // producer.close();
+ }
+
+ public static void wait(int ms)
+ {
+ try
+ {
+ Thread.sleep(ms);
+ }
+ catch(InterruptedException ex)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/tests/integration/docker-compose.yml b/tests/integration/docker-compose.yml
index 4dc18964..853b64c6 100644
--- a/tests/integration/docker-compose.yml
+++ b/tests/integration/docker-compose.yml
@@ -65,6 +65,37 @@ services:
networks:
- kfk
+ start_3party_dependencies:
+ image: dadarek/wait-for-dependencies
+ environment:
+ - SLEEP_LENGTH=5
+ - TIMEOUT_LENGTH=120
+ networks:
+ - kfk
+ depends_on:
+ - kafka1
+ command: kafka1:9092
+
+ kafka_dummy_consumer:
+ container_name: kafka_dummy_consumer
+ build:
+ context: consumer-producer
+ command: ["consumer","kafka1:9092","topicA","groupA"]
+ networks:
+ - kfk
+ depends_on:
+ - start_3party_dependencies
+
+ kafka_dummy_producer:
+ container_name: kafka_dummy_producer
+ build:
+ context: consumer-producer
+ command: ["producer","kafka1:9092","topicA"]
+ networks:
+ - kfk
+ depends_on:
+ - start_3party_dependencies
+
nri-kafka:
container_name: integration_nri_kafka_1
build:
diff --git a/tests/integration/json-schema-files/kafka-schema-consumer-offset.json b/tests/integration/json-schema-files/kafka-schema-consumer-offset.json
new file mode 100644
index 00000000..9bd4f296
--- /dev/null
+++ b/tests/integration/json-schema-files/kafka-schema-consumer-offset.json
@@ -0,0 +1,328 @@
+{
+ "$schema": "http://json-schema.org/draft-07/schema",
+ "type": "object",
+ "required": [
+ "name",
+ "protocol_version",
+ "integration_version",
+ "data"
+ ],
+ "properties": {
+ "name": {
+ "minLength": 1,
+ "pattern": "^com.newrelic.kafka$",
+ "type": "string"
+ },
+ "protocol_version": {
+ "minLength": 1,
+ "pattern": "^3$",
+ "type": "string"
+ },
+ "integration_version": {
+ "minLength": 1,
+ "pattern": "^[0-9]+.[0-9]+.[0-9]+$",
+ "type": "string"
+ },
+ "data": {
+ "type": "array",
+ "uniqueItems": true,
+ "minItems": 3,
+ "items": {
+ "anyOf": [
+ {
+ "type": "object",
+ "required": [
+ "entity",
+ "metrics",
+ "inventory",
+ "events"
+ ],
+ "properties": {
+ "entity": {
+ "type": "object",
+ "required": [
+ "name",
+ "type",
+ "id_attributes"
+ ],
+ "properties": {
+ "name": {
+ "minLength": 1,
+ "type": "string"
+ },
+ "type": {
+ "minLength": 1,
+ "pattern": "^ka-partition-consumer*",
+ "type": "string"
+ },
+ "id_attributes": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "anyOf": [
+ {
+ "type": "object",
+ "required": [
+ "Key",
+ "Value"
+ ],
+ "properties": {
+ "Key": {
+ "type": "string"
+ },
+ "Value": {
+ "type": "string"
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+ },
+ "metrics": {
+ "type": "array",
+ "uniqueItems": true,
+ "minItems": 1,
+ "items": {
+ "anyOf": [
+ {
+ "type": "object",
+ "required": [
+ "clientHost",
+ "clientID",
+ "clusterName",
+ "consumer.hwm",
+ "consumer.lag",
+ "consumer.offset",
+ "consumerGroup",
+ "event_type",
+ "partition",
+ "topic"
+ ],
+ "properties": {
+ "clientHost": {
+ "type": "string"
+ },
+ "clientID": {
+ "type": "string"
+ },
+ "clusterName": {
+ "type": "string"
+ },
+ "consumer.hwm": {
+ "type": "integer"
+ },
+ "consumer.lag": {
+ "type": "integer"
+ },
+ "consumer.offset": {
+ "type": "integer"
+ },
+ "consumerGroup": {
+ "type": "string"
+ },
+ "event_type": {
+ "type": "string"
+ },
+ "partition": {
+ "type": "string"
+ },
+ "topic": {
+ "type": "string"
+ }
+ }
+ }
+ ]
+ }
+ },
+ "inventory": {
+ "type": "object",
+ "required": []
+ },
+ "events": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {}
+ }
+ }
+ },
+ {
+ "type": "object",
+ "required": [
+ "entity",
+ "metrics",
+ "inventory",
+ "events"
+ ],
+ "properties": {
+ "entity": {
+ "type": "object",
+ "required": [
+ "name",
+ "type",
+ "id_attributes"
+ ],
+ "properties": {
+ "name": {
+ "minLength": 1,
+ "type": "string"
+ },
+ "type": {
+ "minLength": 1,
+ "pattern": "^ka-consumer-group*",
+ "type": "string"
+ },
+ "id_attributes": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "Key",
+ "Value"
+ ],
+ "properties": {
+ "Key": {
+ "type": "string"
+ },
+ "Value": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ },
+ "metrics": {
+ "type": "array",
+ "uniqueItems": true,
+ "minItems": 1,
+ "items": {
+ "type": "object",
+ "required": [
+ "clusterName",
+ "consumerGroup",
+ "consumerGroup.maxLag",
+ "consumerGroup.totalLag",
+ "event_type"
+ ],
+ "properties": {
+ "clusterName": {
+ "type": "string"
+ },
+ "consumerGroup": {
+ "type": "string"
+ },
+ "consumerGroup.maxLag": {
+ "type": "integer"
+ },
+ "consumerGroup.totalLag": {
+ "type": "integer"
+ },
+ "event_type": {
+ "type": "string"
+ }
+ }
+ }
+ },
+ "inventory": {
+ "type": "object",
+ "required": []
+ },
+ "events": {
+ "type": "array",
+ "uniqueItems": true
+ }
+ }
+ },
+ {
+ "type": "object",
+ "required": [
+ "entity",
+ "metrics",
+ "inventory",
+ "events"
+ ],
+ "properties": {
+ "entity": {
+ "type": "object",
+ "required": [
+ "name",
+ "type",
+ "id_attributes"
+ ],
+ "properties": {
+ "name": {
+ "minLength": 1,
+ "type": "string"
+ },
+ "type": {
+ "minLength": 1,
+ "pattern": "^ka-consumer*",
+ "type": "string"
+ },
+ "id_attributes": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "Key",
+ "Value"
+ ],
+ "properties": {
+ "Key": {
+ "type": "string"
+ },
+ "Value": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ },
+ "metrics": {
+ "type": "array",
+ "uniqueItems": true,
+ "minItems": 1,
+ "items": {
+ "type": "object",
+ "required": [
+ "clientID",
+ "clusterName",
+ "consumer.totalLag",
+ "event_type"
+ ],
+ "properties": {
+ "clientID": {
+ "type": "string"
+ },
+ "clusterName": {
+ "type": "string"
+ },
+ "consumer.totalLag": {
+ "type": "integer"
+ },
+ "event_type": {
+ "type": "string"
+ }
+ }
+ }
+ },
+ "inventory": {
+ "type": "object",
+ "required": []
+ },
+ "events": {
+ "type": "array",
+ "uniqueItems": true
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+}
diff --git a/tests/integration/json-schema-files/kafka-schema-inventory.json b/tests/integration/json-schema-files/kafka-schema-inventory.json
index cf6b7617..f24ef687 100644
--- a/tests/integration/json-schema-files/kafka-schema-inventory.json
+++ b/tests/integration/json-schema-files/kafka-schema-inventory.json
@@ -8558,6 +8558,399 @@
}
}
},
+ {
+ "type": "object",
+ "required": [
+ "entity",
+ "metrics",
+ "inventory",
+ "events"
+ ],
+ "properties": {
+ "entity": {
+ "type": "object",
+ "required": [
+ "name",
+ "type",
+ "id_attributes"
+ ],
+ "properties": {
+ "name": {
+ "minLength": 1,
+ "pattern": "^__consumer_offsets$",
+ "type": "string"
+ },
+ "type": {
+ "minLength": 1,
+ "pattern": "^ka-topic$",
+ "type": "string"
+ },
+ "id_attributes": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "Key",
+ "Value"
+ ],
+ "properties": {
+ "Key": {
+ "type": "string"
+ },
+ "Value": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ },
+ "metrics": {
+ "type": "array",
+ "uniqueItems": true
+ },
+ "inventory": {
+ "type": "object",
+ "required": [
+ "topic.cleanup.policy",
+ "topic.compression.type",
+ "topic.delete.retention.ms",
+ "topic.file.delete.delay.ms",
+ "topic.flush.messages",
+ "topic.flush.ms",
+ "topic.follower.replication.throttled.replicas",
+ "topic.index.interval.bytes",
+ "topic.leader.replication.throttled.replicas",
+ "topic.max.compaction.lag.ms",
+ "topic.max.message.bytes",
+ "topic.message.downconversion.enable",
+ "topic.message.format.version",
+ "topic.message.timestamp.difference.max.ms",
+ "topic.message.timestamp.type",
+ "topic.min.cleanable.dirty.ratio",
+ "topic.min.compaction.lag.ms",
+ "topic.min.insync.replicas",
+ "topic.partitionScheme",
+ "topic.preallocate",
+ "topic.retention.bytes",
+ "topic.retention.ms",
+ "topic.segment.bytes",
+ "topic.segment.index.bytes",
+ "topic.segment.jitter.ms",
+ "topic.segment.ms",
+ "topic.unclean.leader.election.enable"
+ ],
+ "properties": {
+ "topic.cleanup.policy": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.compression.type": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.delete.retention.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.file.delete.delay.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.flush.messages": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.flush.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.follower.replication.throttled.replicas": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.index.interval.bytes": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.leader.replication.throttled.replicas": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.max.compaction.lag.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.max.message.bytes": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.message.downconversion.enable": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.message.format.version": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.message.timestamp.difference.max.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.message.timestamp.type": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.min.cleanable.dirty.ratio": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.min.compaction.lag.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.min.insync.replicas": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.partitionScheme": {
+ "type": "object",
+ "required": [
+ "Number of Partitions",
+ "Replication Factor"
+ ],
+ "properties": {
+ "Number of Partitions": {
+ "type": "integer"
+ },
+ "Replication Factor": {
+ "type": "integer"
+ }
+ }
+ },
+ "topic.preallocate": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.retention.bytes": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.retention.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.segment.bytes": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.segment.index.bytes": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.segment.jitter.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.segment.ms": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ },
+ "topic.unclean.leader.election.enable": {
+ "type": "object",
+ "required": [
+ "value"
+ ],
+ "properties": {
+ "value": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ },
+ "events": {
+ "type": "array",
+ "uniqueItems": true
+ }
+ }
+ },
{
"type": "object",
"required": [
diff --git a/tests/integration/json-schema-files/kafka-schema-metrics.json b/tests/integration/json-schema-files/kafka-schema-metrics.json
index fc6e8564..a5a6ad73 100644
--- a/tests/integration/json-schema-files/kafka-schema-metrics.json
+++ b/tests/integration/json-schema-files/kafka-schema-metrics.json
@@ -806,6 +806,104 @@
}
}
},
+ {
+ "type": "object",
+ "required": [
+ "entity",
+ "metrics",
+ "inventory",
+ "events"
+ ],
+ "properties": {
+ "entity": {
+ "type": "object",
+ "required": [
+ "name",
+ "type",
+ "id_attributes"
+ ],
+ "properties": {
+ "name": {
+ "minLength": 1,
+ "pattern": "^__consumer_offsets*",
+ "type": "string"
+ },
+ "type": {
+ "minLength": 1,
+ "pattern": "^ka-topic$",
+ "type": "string"
+ },
+ "id_attributes": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "Key",
+ "Value"
+ ],
+ "properties": {
+ "Key": {
+ "type": "string"
+ },
+ "Value": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ },
+ "metrics": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "clusterName",
+ "displayName",
+ "entityName",
+ "event_type",
+ "topic.partitionsWithNonPreferredLeader",
+ "topic.respondsToMetadataRequests",
+ "topic.underReplicatedPartitions"
+ ],
+ "properties": {
+ "clusterName": {
+ "type": "string"
+ },
+ "displayName": {
+ "type": "string"
+ },
+ "entityName": {
+ "type": "string"
+ },
+ "event_type": {
+ "type": "string"
+ },
+ "topic.partitionsWithNonPreferredLeader": {
+ "type": "integer"
+ },
+ "topic.respondsToMetadataRequests": {
+ "type": "integer"
+ },
+ "topic.underReplicatedPartitions": {
+ "type": "integer"
+ }
+ }
+ }
+ },
+ "inventory": {
+ "type": "object",
+ "required": [],
+ "properties": {}
+ },
+ "events": {
+ "type": "array",
+ "uniqueItems": true
+ }
+ }
+ },
{
"type": "object",
"required": [
diff --git a/tests/integration/json-schema-files/kafka-schema-only-local.json b/tests/integration/json-schema-files/kafka-schema-only-local.json
index 8c0c4662..21290375 100644
--- a/tests/integration/json-schema-files/kafka-schema-only-local.json
+++ b/tests/integration/json-schema-files/kafka-schema-only-local.json
@@ -2917,6 +2917,104 @@
}
}
},
+ {
+ "type": "object",
+ "required": [
+ "entity",
+ "metrics",
+ "inventory",
+ "events"
+ ],
+ "properties": {
+ "entity": {
+ "type": "object",
+ "required": [
+ "name",
+ "type",
+ "id_attributes"
+ ],
+ "properties": {
+ "name": {
+ "minLength": 1,
+ "pattern": "^__consumer_offsets*",
+ "type": "string"
+ },
+ "type": {
+ "minLength": 1,
+ "pattern": "^ka-topic$",
+ "type": "string"
+ },
+ "id_attributes": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "Key",
+ "Value"
+ ],
+ "properties": {
+ "Key": {
+ "type": "string"
+ },
+ "Value": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ },
+ "metrics": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "clusterName",
+ "displayName",
+ "entityName",
+ "event_type",
+ "topic.partitionsWithNonPreferredLeader",
+ "topic.respondsToMetadataRequests",
+ "topic.underReplicatedPartitions"
+ ],
+ "properties": {
+ "clusterName": {
+ "type": "string"
+ },
+ "displayName": {
+ "type": "string"
+ },
+ "entityName": {
+ "type": "string"
+ },
+ "event_type": {
+ "type": "string"
+ },
+ "topic.partitionsWithNonPreferredLeader": {
+ "type": "integer"
+ },
+ "topic.respondsToMetadataRequests": {
+ "type": "integer"
+ },
+ "topic.underReplicatedPartitions": {
+ "type": "integer"
+ }
+ }
+ }
+ },
+ "inventory": {
+ "type": "object",
+ "required": [],
+ "properties": {}
+ },
+ "events": {
+ "type": "array",
+ "uniqueItems": true
+ }
+ }
+ },
{
"type": "object",
"required": [
diff --git a/tests/integration/json-schema-files/kafka-schema.json b/tests/integration/json-schema-files/kafka-schema.json
index 931145d5..66494ee9 100644
--- a/tests/integration/json-schema-files/kafka-schema.json
+++ b/tests/integration/json-schema-files/kafka-schema.json
@@ -8693,6 +8693,104 @@
}
}
},
+ {
+ "type": "object",
+ "required": [
+ "entity",
+ "metrics",
+ "inventory",
+ "events"
+ ],
+ "properties": {
+ "entity": {
+ "type": "object",
+ "required": [
+ "name",
+ "type",
+ "id_attributes"
+ ],
+ "properties": {
+ "name": {
+ "minLength": 1,
+ "pattern": "^__consumer_offsets*",
+ "type": "string"
+ },
+ "type": {
+ "minLength": 1,
+ "pattern": "^ka-topic$",
+ "type": "string"
+ },
+ "id_attributes": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "Key",
+ "Value"
+ ],
+ "properties": {
+ "Key": {
+ "type": "string"
+ },
+ "Value": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ },
+ "metrics": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "object",
+ "required": [
+ "clusterName",
+ "displayName",
+ "entityName",
+ "event_type",
+ "topic.partitionsWithNonPreferredLeader",
+ "topic.respondsToMetadataRequests",
+ "topic.underReplicatedPartitions"
+ ],
+ "properties": {
+ "clusterName": {
+ "type": "string"
+ },
+ "displayName": {
+ "type": "string"
+ },
+ "entityName": {
+ "type": "string"
+ },
+ "event_type": {
+ "type": "string"
+ },
+ "topic.partitionsWithNonPreferredLeader": {
+ "type": "integer"
+ },
+ "topic.respondsToMetadataRequests": {
+ "type": "integer"
+ },
+ "topic.underReplicatedPartitions": {
+ "type": "integer"
+ }
+ }
+ }
+ },
+ "inventory": {
+ "type": "object",
+ "required": [],
+ "properties": {}
+ },
+ "events": {
+ "type": "array",
+ "uniqueItems": true
+ }
+ }
+ },
{
"type": "object",
"required": [
diff --git a/tests/integration/kafka_test.go b/tests/integration/kafka_test.go
index b58166c0..af63c9e6 100644
--- a/tests/integration/kafka_test.go
+++ b/tests/integration/kafka_test.go
@@ -19,8 +19,8 @@ import (
)
const (
- BROKER_CONN_MAX_RETRIES = 10
- ENSURE_TOPICS_MAX_RETRIES = 20
+ BROKER_CONN_MAX_RETRIES = 60
+ ENSURE_TOPICS_MAX_RETRIES = 60
KAFKA1_PORT = "19092"
BROKERS_IN_CLUSTER = 3
NUMBER_OF_TOPICS = 3
@@ -29,7 +29,7 @@ const (
var (
iName = "kafka"
- topicNames = []string{"topicA", "topicB", "topicC"}
+ topicNames = []string{"topicA", "topicB", "topicC", "__consumer_offsets"}
defaultContainer = "integration_nri_kafka_1"
defaultBinPath = "/nri-kafka"
@@ -327,3 +327,22 @@ func TestKafkaIntegration_bootstrap_inventory(t *testing.T) {
assert.Contains(t, stdout, topic, fmt.Sprintf("The output doesn't have the topic %s", topic))
}
}
+
+func TestKafkaIntegration_consumer_offset(t *testing.T) {
+ bootstrapDiscoverConfigInventory := func(command []string) []string {
+ return append(
+ bootstrapDiscoverConfig(command),
+ "--consumer_offset",
+ "--consumer_group_regex", ".*",
+ )
+ }
+
+ stdout, stderr, err := runIntegration(t, bootstrapDiscoverConfigInventory)
+
+ assert.NotNil(t, stderr, "unexpected stderr")
+ assert.NoError(t, err, "Unexpected error")
+
+ schemaPath := filepath.Join("json-schema-files", "kafka-schema-consumer-offset.json")
+ err = jsonschema.Validate(schemaPath, stdout)
+ assert.NoError(t, err, "The output of kafka integration doesn't have expected format.")
+}