Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB V2019.11.21 #12342

Merged
merged 16 commits into from
Apr 14, 2020
276 changes: 276 additions & 0 deletions aws/resource_aws_dynamodb_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ func resourceAwsDynamoDbTable() *schema.Resource {
},
},
},
"replica": {
Type: schema.TypeSet,
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"region_name": {
Type: schema.TypeString,
Required: true,
},
},
},
},
},
}
}
Expand Down Expand Up @@ -418,6 +430,12 @@ func resourceAwsDynamoDbTableCreate(d *schema.ResourceData, meta interface{}) er
}
}

if _, ok := d.GetOk("replica"); ok {
if err := createDynamoDbReplicas(d.Id(), d.Get("replica").(*schema.Set).List(), conn); err != nil {
return fmt.Errorf("error enabled DynamoDB Table (%s) replicas: %s", d.Id(), err)
}
}

return resourceAwsDynamoDbTableRead(d, meta)
}

Expand Down Expand Up @@ -595,9 +613,88 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
}
}

if d.HasChange("replica") {
if err := updateDynamoDbReplica(d, conn); err != nil {
return fmt.Errorf("error updating DynamoDB Table (%s) replica: %s", d.Id(), err)
}
}

return resourceAwsDynamoDbTableRead(d, meta)
}

func updateDynamoDbReplica(d *schema.ResourceData, conn *dynamodb.DynamoDB) error {
oRaw, nRaw := d.GetChange("replica")
o := oRaw.(*schema.Set)
n := nRaw.(*schema.Set)

removed := o.Difference(n).List()
added := n.Difference(o).List()

for _, replicaRaw := range added {
m, ok := replicaRaw.(map[string]interface{})

if !ok {
continue
}

regionName := m["region_name"].(string)

replicaUpdate := &dynamodb.ReplicationGroupUpdate{
Create: &dynamodb.CreateReplicationGroupMemberAction{
RegionName: aws.String(regionName),
},
}

input := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
ReplicaUpdates: []*dynamodb.ReplicationGroupUpdate{replicaUpdate},
}

_, err := conn.UpdateTable(input)

if err != nil {
return fmt.Errorf("error adding DynamoDB Table (%s) replica (%s): %w", d.Id(), regionName, err)
}

if err := waitForDynamoDbReplicaUpdateToBeCompleted(d.Id(), regionName, 20*time.Minute, conn); err != nil {
return fmt.Errorf("error waiting for DynamoDB Table (%s) replica (%s) to update: %s", d.Id(), regionName, err)
}
}

for _, replicaRaw := range removed {
m, ok := replicaRaw.(map[string]interface{})

if !ok {
continue
}

regionName := m["region_name"].(string)

replicaUpdate := &dynamodb.ReplicationGroupUpdate{
Delete: &dynamodb.DeleteReplicationGroupMemberAction{
RegionName: aws.String(regionName),
},
}

input := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
ReplicaUpdates: []*dynamodb.ReplicationGroupUpdate{replicaUpdate},
}

_, err := conn.UpdateTable(input)

if err != nil {
return fmt.Errorf("error adding DynamoDB Table (%s) replica (%s): %w", d.Id(), regionName, err)
}

if err := waitForDynamoDbReplicaDeleteToBeCompleted(d.Id(), regionName, 20*time.Minute, conn); err != nil {
return fmt.Errorf("error waiting for DynamoDB Table (%s) replica (%s) to update: %s", d.Id(), regionName, err)
}
}

return nil
}

func resourceAwsDynamoDbTableRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).dynamodbconn

Expand Down Expand Up @@ -652,6 +749,12 @@ func resourceAwsDynamoDbTableDelete(d *schema.ResourceData, meta interface{}) er

log.Printf("[DEBUG] DynamoDB delete table: %s", d.Id())

if replicas := d.Get("replica").(*schema.Set).List(); len(replicas) > 0 {
if err := deleteDynamoDbReplicas(d.Id(), replicas, conn); err != nil {
return fmt.Errorf("error deleting DynamoDB Table (%s) replicas: %s", d.Id(), err)
}
}

err := deleteAwsDynamoDbTable(d.Id(), conn)
if err != nil {
if isAWSErr(err, dynamodb.ErrCodeResourceNotFoundException, "Requested resource not found: Table: ") {
Expand Down Expand Up @@ -702,6 +805,52 @@ func deleteAwsDynamoDbTable(tableName string, conn *dynamodb.DynamoDB) error {
return err
}

func deleteDynamoDbReplicas(tableName string, replicas []interface{}, conn *dynamodb.DynamoDB) error {
for _, replica := range replicas {
var ops []*dynamodb.ReplicationGroupUpdate
if regionName, ok := replica.(map[string]interface{})["region_name"]; ok {
ops = append(ops, &dynamodb.ReplicationGroupUpdate{
Delete: &dynamodb.DeleteReplicationGroupMemberAction{
RegionName: aws.String(regionName.(string)),
},
})

input := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName),
ReplicaUpdates: ops,
}

log.Printf("[DEBUG] Deleting DynamoDB Replicas to %v", input)

err := resource.Retry(20*time.Minute, func() *resource.RetryError {
_, err := conn.UpdateTable(input)
if err != nil {
if isAWSErr(err, "ThrottlingException", "") {
return resource.RetryableError(err)
}
if isAWSErr(err, dynamodb.ErrCodeLimitExceededException, "can be created, updated, or deleted simultaneously") {
return resource.RetryableError(err)
}

return resource.NonRetryableError(err)
}
return nil
})
if isResourceTimeoutError(err) {
_, err = conn.UpdateTable(input)
}
if err != nil {
return fmt.Errorf("Error deleting DynamoDB Replicas status: %s", err)
}

if err := waitForDynamoDbReplicaDeleteToBeCompleted(tableName, regionName.(string), 20*time.Minute, conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB replica delete: %s", err)
}
}
}
return nil
}

func waitForDynamodbTableDeletion(conn *dynamodb.DynamoDB, tableName string, timeout time.Duration) error {
stateConf := &resource.StateChangeConf{
Pending: []string{
Expand Down Expand Up @@ -738,6 +887,133 @@ func waitForDynamodbTableDeletion(conn *dynamodb.DynamoDB, tableName string, tim
return err
}

func waitForDynamoDbReplicaUpdateToBeCompleted(tableName string, region string, timeout time.Duration, conn *dynamodb.DynamoDB) error {
stateConf := &resource.StateChangeConf{
Pending: []string{
dynamodb.ReplicaStatusCreating,
dynamodb.ReplicaStatusUpdating,
dynamodb.ReplicaStatusDeleting,
},
Target: []string{
dynamodb.ReplicaStatusActive,
},
Timeout: timeout,
Refresh: func() (interface{}, string, error) {
result, err := conn.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err != nil {
return 42, "", err
}
log.Printf("[DEBUG] DynamoDB replicas: %s", result.Table.Replicas)

var targetReplica *dynamodb.ReplicaDescription

for _, replica := range result.Table.Replicas {
if aws.StringValue(replica.RegionName) == region {
targetReplica = replica
break
}
}

if targetReplica == nil {
return nil, dynamodb.ReplicaStatusCreating, nil
}

return result, aws.StringValue(targetReplica.ReplicaStatus), nil
},
}
_, err := stateConf.WaitForState()

return err
}

func waitForDynamoDbReplicaDeleteToBeCompleted(tableName string, region string, timeout time.Duration, conn *dynamodb.DynamoDB) error {
stateConf := &resource.StateChangeConf{
Pending: []string{
dynamodb.ReplicaStatusCreating,
dynamodb.ReplicaStatusUpdating,
dynamodb.ReplicaStatusDeleting,
dynamodb.ReplicaStatusActive,
},
Target: []string{""},
Timeout: timeout,
Refresh: func() (interface{}, string, error) {
result, err := conn.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
})
if err != nil {
return 42, "", err
}

log.Printf("[DEBUG] all replicas for waiting: %s", result.Table.Replicas)
var targetReplica *dynamodb.ReplicaDescription

for _, replica := range result.Table.Replicas {
if aws.StringValue(replica.RegionName) == region {
targetReplica = replica
break
}
}

if targetReplica == nil {
return nil, "", nil
}

return result, aws.StringValue(targetReplica.ReplicaStatus), nil
},
}
_, err := stateConf.WaitForState()

return err
}

func createDynamoDbReplicas(tableName string, replicas []interface{}, conn *dynamodb.DynamoDB) error {
for _, replica := range replicas {
var ops []*dynamodb.ReplicationGroupUpdate
if regionName, ok := replica.(map[string]interface{})["region_name"]; ok {
ops = append(ops, &dynamodb.ReplicationGroupUpdate{
Create: &dynamodb.CreateReplicationGroupMemberAction{
RegionName: aws.String(regionName.(string)),
},
})

input := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName),
ReplicaUpdates: ops,
}

log.Printf("[DEBUG] Updating DynamoDB Replicas to %v", input)

err := resource.Retry(20*time.Minute, func() *resource.RetryError {
_, err := conn.UpdateTable(input)
if err != nil {
if isAWSErr(err, "ThrottlingException", "") {
return resource.RetryableError(err)
}
if isAWSErr(err, dynamodb.ErrCodeLimitExceededException, "can be created, updated, or deleted simultaneously") {
return resource.RetryableError(err)
}

return resource.NonRetryableError(err)
}
return nil
})
if isResourceTimeoutError(err) {
_, err = conn.UpdateTable(input)
}
if err != nil {
return fmt.Errorf("Error updating DynamoDB Replicas status: %s", err)
}

if err := waitForDynamoDbReplicaUpdateToBeCompleted(tableName, regionName.(string), 20*time.Minute, conn); err != nil {
return fmt.Errorf("Error waiting for DynamoDB replica update: %s", err)
}
}
}
return nil
}

func updateDynamoDbTimeToLive(tableName string, ttlList []interface{}, conn *dynamodb.DynamoDB) error {
ttlMap := ttlList[0].(map[string]interface{})

Expand Down
Loading