DMS 任务失败自动尝试恢复
日期: 2023-05-11

 

AWS DMS 任务失败的原因主要有如下:

1.源数据库类故障(包括数据库服务中断、网络通信故障)

2.数据类错误,如转换错误、截断等

 

 

对于因源数据库故障导致的AWS DMS任务失败,可以使用Amazon EventBridge接收AWS DMS任务事件(需升级AWS DMS复制实例到3.4.6或更高版本),并与预先定义的Event Pattern进行匹配。如果匹配,使用Amazon Simple Notification Service(Amazon SNS)提供通知服务(比如邮件告警),同时发送事件到AWS Step Functions状态机,自动尝试对AWS DMS任务恢复。

 

 

下面架构图展示了方案架构设计:

 

 

  1. Amazon EventBridge从AWS DMS接收事件(任务或复制实例)
  2. 对于匹配预先定义的Event Pattern的事件,使用Amazon SNS通知服务以邮件进行告警
  3. 对于匹配预先定义的Event Pattern的事件,使用AWS Step Functions状态机尝试恢复任务运行

 

下面将详细介绍EventBridge规则配置、状态机流程和代码示例。

 

 

Amazon EventBridge Event Pattern定义如下:

{
    “source”: [“aws.dms“],
    “resources”: [“your-task-arn“],
    “detail”: {
        “type”: [“REPLICATION_TASK“],
        “category”: [“Failure”, “StateChange”],
        “eventType”: [“REPLICATION_TASK_FAILED”, “REPLICATION_TASK_STOPPED”]
}
}

 

 

 

AWS Step Functions状态机图如下:

 

 

状态机定义如下:

 

{
“Comment”: “A description of my state machine”,
“StartAt”: “检查DMS任务状态”,
“States”: {
“检查DMS任务状态”: {
“Type”: “Task”,
“Next”: “任务状态判断和处理”,
“Parameters”: {
“Filters”: [
{
“Name”: “replication-task-arn”,
“Values.$”: “$.replication-task-arn”
}
]
},
“Resource”: “arn:aws-cn:states:::aws-sdk:databasemigration:describeReplicationTasks”,
“ResultSelector”: {
“ReplicationTasks.$”: “$.ReplicationTasks”
},
“ResultPath”: “$”
},
“任务状态判断和处理”: {
“Type”: “Map”,
“ItemsPath”: “$.ReplicationTasks”,
“Iterator”: {
“StartAt”: “任务是否正在运行”,
“States”: {
“任务是否正在运行”: {
“Type”: “Choice”,
“Choices”: [
{
“Variable”: “$.Status”,
“StringEquals”: “stopped”,
“Next”: “开始启动任务”,
“Comment”: “任务处于停止状态”
},
{
“Variable”: “$.Status”,
“StringEquals”: “failed”,
“Next”: “开始启动任务”,
“Comment”: “任务处于失败状态”
},
{
“Variable”: “$.Status”,
“StringEquals”: “running”,
“Next”: “Success”,
“Comment”: “任务运行中”
},
{
“Variable”: “$.Status”,
“StringEquals”: “starting”,
“Next”: “任务启动中,等待30s”,
“Comment”: “任务正在启动”
},
{
“Variable”: “$.Status”,
“StringEquals”: “stopping”,
“Next”: “任务正在停止,等待30s”,
“Comment”: “任务正在停止”
}
]
},
“任务正在停止,等待30s”: {
“Type”: “Wait”,
“Seconds”: 30,
“Next”: “完善任务arn”
},
“任务启动中,等待30s”: {
“Type”: “Wait”,
“Seconds”: 30,
“Next”: “完善任务arn”
},
“任务正在停止,再次等待30s”: {
“Type”: “Wait”,
“Seconds”: 30,
“Next”: “完善任务arn”
},
“开始启动任务”: {
“Type”: “Task”,
“Parameters”: {
“ReplicationTaskArn.$”: “$.ReplicationTaskArn”,
“StartReplicationTaskType”: “resume-processing”
},
“Resource”: “arn:aws-cn:states:::aws-sdk:databasemigration:startReplicationTask”,
“Next”: “任务启动中,等待30s”,
“ResultSelector”: {
“ReplicationTaskArn.$”: “$.ReplicationTask.ReplicationTaskArn”,
“Status.$”: “$.ReplicationTask.Status”
}
},
“完善任务arn”: {
“Type”: “Task”,
“Resource”: “arn:aws-cn:states:::lambda:invoke”,
“Parameters”: {
“Payload.$”: “$”,
“FunctionName”: “your-lambda-arn”
},
“Retry”: [
{
“ErrorEquals”: [
“Lambda.ServiceException”,
“Lambda.AWSLambdaException”,
“Lambda.SdkClientException”
],
“IntervalSeconds”: 2,
“MaxAttempts”: 6,
“BackoffRate”: 2
}
],
“Next”: “检查任务状态”,
“ResultSelector”: {
“ReplicationTaskArn.$”: “$.Payload.ReplicationTaskArn”,
“Status.$”: “$.Payload.Status”
}
},
“检查任务状态”: {
“Type”: “Task”,
“Next”: “任务状态”,
“Parameters”: {
“Filters”: [
{
“Name”: “replication-task-arn”,
“Values.$”: “$.ReplicationTaskArn”
}
]
},
“Resource”: “arn:aws-cn:states:::aws-sdk:databasemigration:describeReplicationTasks”,
“ResultSelector”: {
“Status.$”: “$.ReplicationTasks[0].Status”,
“ReplicationTaskArn.$”: “$.ReplicationTasks[0].ReplicationTaskArn”
}
},
“任务状态”: {
“Type”: “Choice”,
“Choices”: [
{
“Variable”: “$.Status”,
“StringEquals”: “running”,
“Next”: “Success”,
“Comment”: “正在运行”
},
{
“Variable”: “$.Status”,
“StringEquals”: “starting”,
“Next”: “任务启动中,等待30s”,
“Comment”: “任务启动中”
},
{
“Variable”: “$.Status”,
“StringEquals”: “stopped”,
“Next”: “开始启动任务”,
“Comment”: “任务处于停止状态”
},
{
“Variable”: “$.Status”,
“StringEquals”: “failed”,
“Next”: “开始启动任务”,
“Comment”: “任务处于失败状态”
},
{
“Variable”: “$.Status”,
“StringEquals”: “stopping”,
“Next”: “任务正在停止,再次等待30s”,
“Comment”: “任务正在停止”
}
],
“Default”: “Success”
},
“Success”: {
“Type”: “Succeed”
}
}
},
“End”: true
}
}

 

 

完善任务arn使用AWS Lambda函数将AWS DMS任务的arn转化为数组,示例代码如下:

 

import json
def lambda_handler(event, context):
ReplicationTaskArn = []
ReplicationTaskArn.append(event[“ReplicationTaskArn”])
return {
“Status”: event[“Status”],
“ReplicationTaskArn”: ReplicationTaskArn,
‘statusCode’: 200,
‘body’: json.dumps(‘Here is to convert the resource arn to array!‘)