Skip to content

Commit

Permalink
full instance implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sgmv committed Aug 6, 2024
1 parent dbecdc1 commit 711c74f
Show file tree
Hide file tree
Showing 26 changed files with 909 additions and 9 deletions.
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/BatchRestartDelete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func BatchRestartDelete(client *golangsdk.ServiceClient, opts CreateOpts) (*Batc
return nil, err
}

raw, err := client.Post(client.ServiceURL(resourcePath, actionEdnpoint), b, nil, &golangsdk.RequestOpts{
raw, err := client.Post(client.ServiceURL(ResourcePath, actionEdnpoint), b, nil, &golangsdk.RequestOpts{
OkCodes: []int{200, 204},
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/Create.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func Create(client *golangsdk.ServiceClient, opts CreateOpts) (*InstanceIDResp,
// For all requests we use schema /v2/{project_id}/instances
// But for a creation /v2/{engine}/{project_id}/instances
paths := strings.SplitN(client.Endpoint, "v2", 2)
url := fmt.Sprintf("%sv2/%s%s%s", paths[0], opts.Engine, paths[1], resourcePath)
url := fmt.Sprintf("%sv2/%s%s%s", paths[0], opts.Engine, paths[1], ResourcePath)

raw, err := client.Post(url, b, nil, &golangsdk.RequestOpts{
OkCodes: []int{200},
Expand Down
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/Delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import golangsdk "github.com/opentelekomcloud/gophertelekomcloud"
// Delete an instance by id
// Send DELETE /v2/{project_id}/instances/{instance_id}
func Delete(client *golangsdk.ServiceClient, id string) error {
_, err := client.Delete(client.ServiceURL(resourcePath, id), &golangsdk.RequestOpts{
_, err := client.Delete(client.ServiceURL(ResourcePath, id), &golangsdk.RequestOpts{
OkCodes: []int{204},
})
return err
Expand Down
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/Get.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ type Instance struct {
// Get an instance with detailed information by id
// Send GET /v2/{project_id}/instances/{instance_id}
func Get(client *golangsdk.ServiceClient, id string) (*Instance, error) {
raw, err := client.Get(client.ServiceURL(resourcePath, id), nil, nil)
raw, err := client.Get(client.ServiceURL(ResourcePath, id), nil, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/GetConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const configsEndpoint = "configs"
// GetInstanceConf is used to obtain instance configurations.
// Send GET /v2/{project_id}/instances/{instance_id}/configs
func GetInstanceConf(client *golangsdk.ServiceClient, id string) (*InstanceConfs, error) {
raw, err := client.Get(client.ServiceURL(resourcePath, id, configsEndpoint), nil, nil)
raw, err := client.Get(client.ServiceURL(ResourcePath, id, configsEndpoint), nil, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/List.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ListOpts struct {
}

func List(client *golangsdk.ServiceClient, opts ListOpts) (*ListResponse, error) {
url, err := golangsdk.NewURLBuilder().WithEndpoints(resourcePath).WithQueryParams(&opts).Build()
url, err := golangsdk.NewURLBuilder().WithEndpoints(ResourcePath).WithQueryParams(&opts).Build()
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/Put.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Update(client *golangsdk.ServiceClient, id string, opts UpdateOpts) error {
return err
}

_, err = client.Put(client.ServiceURL(resourcePath, id), body, nil, &golangsdk.RequestOpts{
_, err = client.Put(client.ServiceURL(ResourcePath, id), body, nil, &golangsdk.RequestOpts{
OkCodes: []int{204},
})
return err
Expand Down
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/PutConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func UpdateInstanceConf(client *golangsdk.ServiceClient, id string, opts UpdateI
return err
}

_, err = client.Put(client.ServiceURL(resourcePath, id, configsEndpoint), body, nil, &golangsdk.RequestOpts{
_, err = client.Put(client.ServiceURL(ResourcePath, id, configsEndpoint), body, nil, &golangsdk.RequestOpts{
OkCodes: []int{200},
})
return err
Expand Down
2 changes: 1 addition & 1 deletion openstack/dms/v2.1/instances/common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package instances

// endpoint/instances
const resourcePath = "instances"
const ResourcePath = "instances"
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package management

import (
golangsdk "github.com/opentelekomcloud/gophertelekomcloud"
"github.com/opentelekomcloud/gophertelekomcloud/internal/build"
"github.com/opentelekomcloud/gophertelekomcloud/internal/extract"
"github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2.1/instances"
)

const batchDeletePath = "batch-delete"

type BatchDeleteConsumberGroupOpts struct {
// IDs of all consumer groups to be deleted.
GroupIds []string `json:"group_ids" required:"true"`
}

// BatchDeleteConsumerGroupFromInstance is used to delete multiple consumer groups of a Kafka instance in batches.
// Send POST /v2/{project_id}/instances/{instance_id}/groups/batch-delete
func BatchDeleteConsumerGroupFromInstance(client *golangsdk.ServiceClient, instanceId, groupId string, opts BatchDeleteConsumberGroupOpts) (*BatchDeleteConsumerGroupResp, error) {
body, err := build.RequestBody(opts, "")
if err != nil {
return nil, err
}

raw, err := client.Post(client.ServiceURL(instances.ResourcePath, instanceId, groupPath, batchDeletePath), body, nil, &golangsdk.RequestOpts{
OkCodes: []int{204},
})
if err != nil {
return nil, err
}

var res BatchDeleteConsumerGroupResp
err = extract.Into(raw.Body, &res)
return &res, err
}

type BatchDeleteConsumerGroupResp struct {
// List of consumer groups that failed to be deleted.
FailedGroups []*FailedGroup `json:"failed_groups"`
// Number of records that fail to be deleted.
Total int `json:"total"`
}

type FailedGroup struct {
// ID of consumer groups that failed to be deleted.
GroupId string `json:"group_id"`
// Cause of the deletion failure.
ErrorMessage string `json:"error_message"`
}
28 changes: 28 additions & 0 deletions openstack/dms/v2.1/instances/management/ConfAutoTopicCreation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package management

import (
golangsdk "github.com/opentelekomcloud/gophertelekomcloud"
"github.com/opentelekomcloud/gophertelekomcloud/internal/build"
"github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2.1/instances"
)

const confAutoTopicCreationPath = "autotopic"

type ConfAutoTopicCreationOpts struct {
EnableAutoTopic bool `json:"enable_auto_topic" required:"true"`
}

// ConfAutoTopicCreation is used to enable or disable automatic topic creation.
// Send POST /v2/{project_id}/instances/{instance_id}/autotopic
func ConfAutoTopicCreation(client *golangsdk.ServiceClient, id string, opts ConfAutoTopicCreationOpts) error {
body, err := build.RequestBody(opts, "")
if err != nil {
return err
}

_, err = client.Post(client.ServiceURL(instances.ResourcePath, id, confAutoTopicCreationPath), body, nil, &golangsdk.RequestOpts{
OkCodes: []int{204},
})

return err
}
40 changes: 40 additions & 0 deletions openstack/dms/v2.1/instances/management/CreateConsumerGroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package management

import (
golangsdk "github.com/opentelekomcloud/gophertelekomcloud"
"github.com/opentelekomcloud/gophertelekomcloud/internal/build"
"github.com/opentelekomcloud/gophertelekomcloud/internal/extract"
"github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2.1/instances"
)

type CreateConsumerGroupOpts struct {
// Consumer group name.
GroupName string `json:"group_name" required:"true"`
// Consumer group description.
// Minimum: 0
// Maximum: 200
Description string `json:"group_desc"`
}

// CreateConsumerGroup is used to create a consumer group.
// Send POST /v2/{project_id}/kafka/instances/{instance_id}/group
func CreateConsumerGroup(client *golangsdk.ServiceClient, instanceId string, opts *CreateConsumerGroupOpts) (*ErrorResp, error) {
body, err := build.RequestBody(opts, "")
if err != nil {
return nil, err
}

raw, err := client.Post(client.ServiceURL(instances.ResourcePath, instanceId, groupPath), body, nil, &golangsdk.RequestOpts{})
if err != nil {
return nil, err
}

var res ErrorResp
err = extract.Into(raw.Body, &res)
return &res, err
}

type ErrorResp struct {
ErrorCode string `json:"error_code,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}
53 changes: 53 additions & 0 deletions openstack/dms/v2.1/instances/management/CrossVPCModify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package management

import (
golangsdk "github.com/opentelekomcloud/gophertelekomcloud"
"github.com/opentelekomcloud/gophertelekomcloud/internal/build"
"github.com/opentelekomcloud/gophertelekomcloud/internal/extract"
"github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2.1/instances"
)

const (
crossVPCPath = "crossvpc"
modifyPath = "modify"
)

type CrossVPCModifyOpts struct {
AdvertisedIpContents map[string]string `json:"advertised_ip_contents" required:"true"`
}

// CrossVPCModify is used to modify the private IP address for cross-VPC access.
// Send POST to /v2/{project_id}/instances/{instance_id}/crossvpc/modify
func CrossVPCModify(client *golangsdk.ServiceClient, id string, opts PasswordOpts) (*CrossVPCModifyResp, error) {
body, err := build.RequestBody(opts, "")
if err != nil {
return nil, err
}

raw, err := client.Post(client.ServiceURL(instances.ResourcePath, id, crossVPCPath, modifyPath), body, nil, &golangsdk.RequestOpts{
OkCodes: []int{200},
})
if err != nil {
return nil, err
}

var res CrossVPCModifyResp
err = extract.Into(raw.Body, &res)
return &res, err
}

type CrossVPCModifyResp struct {
// Result of the cross-VPC access modification.
Success bool `json:"success"`
// Details of the result of the cross-VPC access modification.
Results []*Result `json:"results"`
}

type Result struct {
// advertised.listeners IP address or domain name.
AdvertisedIp string `json:"advertised_ip"`
// Status of the cross-VPC access modification.
Success bool `json:"success"`
// Listeners IP address.
Ip string `json:"ip"`
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package management

import (
golangsdk "github.com/opentelekomcloud/gophertelekomcloud"
"github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2.1/instances"
)

// DeleteConsumerGroupFromInstance is used to delete a consumer group from a Kafka instance.
// Send DELETE /v2/{project_id}/instances/{instance_id}/groups/{group}
func DeleteConsumerGroupFromInstance(client *golangsdk.ServiceClient, instanceId, groupId string) error {
_, err := client.Delete(client.ServiceURL(instances.ResourcePath, instanceId, groupPath, groupId), &golangsdk.RequestOpts{
OkCodes: []int{204},
})
return err
}
25 changes: 25 additions & 0 deletions openstack/dms/v2.1/instances/management/GetConsumerGroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package management

import (
golangsdk "github.com/opentelekomcloud/gophertelekomcloud"
"github.com/opentelekomcloud/gophertelekomcloud/internal/extract"
"github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2.1/instances"
)

// GetConsumerGroup is used to query a specific consumer group.
// Send GET /v2/{project_id}/instances/{instance_id}/groups/{group}
func GetConsumerGroup(client *golangsdk.ServiceClient, instanceId, groupId string) (*GetConsumerGropusResp, error) {
raw, err := client.Get(client.ServiceURL(instances.ResourcePath, instanceId, groupPath, groupId), nil, nil)
if err != nil {
return nil, err
}

var res GetConsumerGropusResp
err = extract.Into(raw.Body, &res)
return &res, err
}

type GetConsumerGropusResp struct {
// Consumer group information.
Group []*Group `json:"group"`
}
79 changes: 79 additions & 0 deletions openstack/dms/v2.1/instances/management/GetConsumerGroupDetails.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package management

import (
golangsdk "github.com/opentelekomcloud/gophertelekomcloud"
"github.com/opentelekomcloud/gophertelekomcloud/internal/extract"
"github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2.1/instances"
)

const (
groupPath = "groups"
)

// GetConsumerGroupDetails is used to query consumer group details.
// Send GET /v2/{project_id}/instances/{instance_id}/management/groups/{group}
func GetConsumerGroupDetails(client *golangsdk.ServiceClient, instanceId, groupId string) (*ConsumerGroupResp, error) {
raw, err := client.Get(client.ServiceURL(instances.ResourcePath, instanceId, managementPath, groupPath, groupId), nil, nil)
if err != nil {
return nil, err
}

var res ConsumerGroupResp
err = extract.Into(raw.Body, &res)
return &res, err
}

type ConsumerGroupResp struct {
// Details of the result of the cross-VPC access modification.
Group *Group `json:"group"`
}

type Group struct {
// Consumer group name.
GroupId string `json:"group_id"`
// Consumer group status. The value can be:
// Dead: The consumer group has no members and no metadata.
// Empty: The consumer group has metadata but has no members.
// PreparingRebalance: The consumer group is to be rebalanced.
// CompletingRebalance: All members have jointed the group.
// Stable: Members in the consumer group can consume messages normally.
State string `json:"state"`
// Coordinator ID.
CoordinatorId int `json:"coordinator_id"`
// Consumer list.
Members []*Member `json:"members"`
// Consumer offset.
GroupMessageOffsets []GroupMessageOffest `json:"group_message_offsets"`
// Partition assignment policy.
AssignmentStrategy string `json:"assignment_strategy"`
}

type Member struct {
// Consumer address.
Host string `json:"host"`
// Details about the partition assigned to the consumer.
Assignment []*Assignment `json:"assignment"`
// Consumer ID.
MemberId string `json:"member_id"`
// Client ID.
ClientId string `json:"client_id"`
}
type Assignment struct {
// Topic name.
Topic string `json:"topic"`
// Partition list.
Partitions []int `json:"partitions"`
}

type GroupMessageOffest struct {
// Partition number.
Partition int `json:"partition"`
// Number of remaining messages that can be retrieved, that is, the number of accumulated messages.
Lag int64 `json:"lag"`
// Topic name.
Topic string `json:"topic"`
// Consumer offset.
MessageCurrentOffset int64 `json:"message_current_offset"`
// Log end offset (LEO).
MessageLogEndOffset int64 `json:"message_log_end_offset"`
}
Loading

0 comments on commit 711c74f

Please sign in to comment.