DMS 任务失败自动尝试恢复
日期: 2023-05-11
AWS DMS 任务失败的原因主要有如下:
1.源数据库类故障(包括数据库服务中断、网络通信故障)
2.数据类错误,如转换错误、截断等
对于因源数据库故障导致的AWS DMS任务失败,可以使用Amazon EventBridge接收AWS DMS任务事件(需升级AWS DMS复制实例到3.4.6或更高版本),并与预先定义的Event Pattern进行匹配。如果匹配,使用Amazon Simple Notification Service(Amazon SNS)提供通知服务(比如邮件告警),同时发送事件到AWS Step Functions状态机,自动尝试对AWS DMS任务恢复。
下面架构图展示了方案架构设计:
- Amazon EventBridge从AWS DMS接收事件(任务或复制实例)
- 对于匹配预先定义的Event Pattern的事件,使用Amazon SNS通知服务以邮件进行告警
- 对于匹配预先定义的Event Pattern的事件,使用AWS Step Functions状态机尝试恢复任务运行
下面将详细介绍EventBridge规则配置、状态机流程和代码示例。
Amazon EventBridge Event Pattern定义如下:
{ “source”: [“aws.dms“], “resources”: [“your-task-arn“], “detail”: { “type”: [“REPLICATION_TASK“], “category”: [“Failure”, “StateChange”], “eventType”: [“REPLICATION_TASK_FAILED”, “REPLICATION_TASK_STOPPED”] } } |
AWS Step Functions状态机图如下:
状态机定义如下:
{ “Comment”: “A description of my state machine”, “StartAt”: “检查DMS任务状态”, “States”: { “检查DMS任务状态”: { “Type”: “Task”, “Next”: “任务状态判断和处理”, “Parameters”: { “Filters”: [ { “Name”: “replication-task-arn”, “Values.$”: “$.replication-task-arn” } ] }, “Resource”: “arn:aws-cn:states:::aws-sdk:databasemigration:describeReplicationTasks”, “ResultSelector”: { “ReplicationTasks.$”: “$.ReplicationTasks” }, “ResultPath”: “$” }, “任务状态判断和处理”: { “Type”: “Map”, “ItemsPath”: “$.ReplicationTasks”, “Iterator”: { “StartAt”: “任务是否正在运行”, “States”: { “任务是否正在运行”: { “Type”: “Choice”, “Choices”: [ { “Variable”: “$.Status”, “StringEquals”: “stopped”, “Next”: “开始启动任务”, “Comment”: “任务处于停止状态” }, { “Variable”: “$.Status”, “StringEquals”: “failed”, “Next”: “开始启动任务”, “Comment”: “任务处于失败状态” }, { “Variable”: “$.Status”, “StringEquals”: “running”, “Next”: “Success”, “Comment”: “任务运行中” }, { “Variable”: “$.Status”, “StringEquals”: “starting”, “Next”: “任务启动中,等待30s”, “Comment”: “任务正在启动” }, { “Variable”: “$.Status”, “StringEquals”: “stopping”, “Next”: “任务正在停止,等待30s”, “Comment”: “任务正在停止” } ] }, “任务正在停止,等待30s”: { “Type”: “Wait”, “Seconds”: 30, “Next”: “完善任务arn” }, “任务启动中,等待30s”: { “Type”: “Wait”, “Seconds”: 30, “Next”: “完善任务arn” }, “任务正在停止,再次等待30s”: { “Type”: “Wait”, “Seconds”: 30, “Next”: “完善任务arn” }, “开始启动任务”: { “Type”: “Task”, “Parameters”: { “ReplicationTaskArn.$”: “$.ReplicationTaskArn”, “StartReplicationTaskType”: “resume-processing” }, “Resource”: “arn:aws-cn:states:::aws-sdk:databasemigration:startReplicationTask”, “Next”: “任务启动中,等待30s”, “ResultSelector”: { “ReplicationTaskArn.$”: “$.ReplicationTask.ReplicationTaskArn”, “Status.$”: “$.ReplicationTask.Status” } }, “完善任务arn”: { “Type”: “Task”, “Resource”: “arn:aws-cn:states:::lambda:invoke”, “Parameters”: { “Payload.$”: “$”, “FunctionName”: “your-lambda-arn” }, “Retry”: [ { “ErrorEquals”: [ “Lambda.ServiceException”, “Lambda.AWSLambdaException”, “Lambda.SdkClientException” ], “IntervalSeconds”: 2, “MaxAttempts”: 6, “BackoffRate”: 2 } ], “Next”: “检查任务状态”, “ResultSelector”: { “ReplicationTaskArn.$”: “$.Payload.ReplicationTaskArn”, “Status.$”: “$.Payload.Status” } }, “检查任务状态”: { “Type”: “Task”, “Next”: “任务状态”, “Parameters”: { “Filters”: [ { “Name”: “replication-task-arn”, “Values.$”: “$.ReplicationTaskArn” } ] }, “Resource”: “arn:aws-cn:states:::aws-sdk:databasemigration:describeReplicationTasks”, “ResultSelector”: { “Status.$”: “$.ReplicationTasks[0].Status”, “ReplicationTaskArn.$”: “$.ReplicationTasks[0].ReplicationTaskArn” } }, “任务状态”: { “Type”: “Choice”, “Choices”: [ { “Variable”: “$.Status”, “StringEquals”: “running”, “Next”: “Success”, “Comment”: “正在运行” }, { “Variable”: “$.Status”, “StringEquals”: “starting”, “Next”: “任务启动中,等待30s”, “Comment”: “任务启动中” }, { “Variable”: “$.Status”, “StringEquals”: “stopped”, “Next”: “开始启动任务”, “Comment”: “任务处于停止状态” }, { “Variable”: “$.Status”, “StringEquals”: “failed”, “Next”: “开始启动任务”, “Comment”: “任务处于失败状态” }, { “Variable”: “$.Status”, “StringEquals”: “stopping”, “Next”: “任务正在停止,再次等待30s”, “Comment”: “任务正在停止” } ], “Default”: “Success” }, “Success”: { “Type”: “Succeed” } } }, “End”: true } } |
完善任务arn使用AWS Lambda函数将AWS DMS任务的arn转化为数组,示例代码如下:
import json def lambda_handler(event, context): ReplicationTaskArn = [] ReplicationTaskArn.append(event[“ReplicationTaskArn”]) return { “Status”: event[“Status”], “ReplicationTaskArn”: ReplicationTaskArn, ‘statusCode’: 200, ‘body’: json.dumps(‘Here is to convert the resource arn to array!‘) |