

# 使用 API 来衡量和管理数据质量
<a name="data-quality-using-apis"></a>

本主题介绍如何使用 API 来衡量和管理数据质量。

**Contents**
+ [先决条件](#using-apis-prerequisites)
+ [使用 AWS Glue Data Quality 建议](#using-apis-recommendations)
+ [使用 AWS Glue Data Quality 规则集](#using-apis-rulesets)
+ [使用 AWS Glue Data Quality 运行](#using-apis-runs)
+ [处理 AWS Glue Data Quality 结果](#using-apis-results)

## 先决条件
<a name="using-apis-prerequisites"></a>
+ 确保您的 boto3 版本是最新版本，以便它包含最新的 AWS Glue Data Quality API。
+ 确保您的 AWS CLI 版本是最新版本，以便包含最新的 CLI。

如果您使用 AWS Glue 作业来运行这些 API，则可以使用以下选项将 boto3 库更新到最新版本：

```
—additional-python-modules boto3==<version>
```

## 使用 AWS Glue Data Quality 建议
<a name="using-apis-recommendations"></a>

**要启动 AWS Glue Data Quality 建议，请运行：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn):
        """
        Starts a recommendation run that is used to generate rules when you don't know what rules to write. AWS Glue Data Quality 
        analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset 
        and modify the generated ruleset to your liking.

        :param database_name: The name of the AWS Glue database which contains the dataset.
        :param table_name: The name of the AWS Glue table against which we want a recommendation
        :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs.

        """
        try:
            response = self.client.start_data_quality_rule_recommendation_run(
                DataSource={
                    'GlueTable': {
                        'DatabaseName': database_name,
                        'TableName': table_name
                    }
                },
                Role=role_arn
            )
        except ClientError as err:
            logger.error(
                "Couldn't start data quality recommendation run %s. Here's why: %s: %s", name,
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response['RunId']
```

对于建议运行，您可以使用 `pushDownPredicates` 或 `catalogPartitionPredicates` 来提高性能，并且只能在目录源的特定分区上运行建议。

```
client.start_data_quality_rule_recommendation_run(
            DataSource={
                'GlueTable': {
                    'DatabaseName': database_name,
                    'TableName': table_name,
                    'AdditionalOptions': {
                        'pushDownPredicate': "year=2022"
                    }
                }
            },
            Role=role_arn,
            NumberOfWorkers=2,
            CreatedRulesetName='<rule_set_name>'
  )
```

**要获取 AWS Glue Data Quality 建议的结果，请运行：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
    
    def get_data_quality_rule_recommendation_run(self, run_id):
        """
        Gets the specified recommendation run that was used to generate rules.

        :param run_id: The id of the data quality recommendation run
        
        """
        try:
            response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id)
        except ClientError as err:
            logger.error(
                "Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id,
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response
```

从上面的响应对象中，您可以提取运行推荐的规则集，以便在后续步骤中使用：

```
print(response['RecommendedRuleset'])

Rules = [
    RowCount between 2000 and 8000,
    IsComplete "col1",
    IsComplete "col2",
    StandardDeviation "col3" between 58138330.8 and 64258155.09,
    ColumnValues "col4" between 1000042965 and 1214474826,
    IsComplete "col5"
]
```

**要获取可以筛选和列出的所有建议的列表，请执行以下操作：**

```
response = client.list_data_quality_rule_recommendation_runs(
    Filter={
        'DataSource': {
            'GlueTable': {
                'DatabaseName': '<database_name>',
                'TableName': '<table_name>'
            }
        }
)
```

**要取消现有 AWS Glue Data Quality 建议任务，请执行以下操作：**

```
response = client.cancel_data_quality_rule_recommendation_run(
    RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)
```

## 使用 AWS Glue Data Quality 规则集
<a name="using-apis-rulesets"></a>

**要创建 AWS Glue Data Quality 规则集，请执行以下操作：**

```
response = client.create_data_quality_ruleset(
    Name='<ruleset_name>',
    Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]',
    TargetTable={
        'TableName': '<table_name>',
        'DatabaseName': '<database_name>'
    }
)
```

**要创建数据质量规则集，请执行以下操作：**

```
response = client.get_data_quality_ruleset(
    Name='<ruleset_name>'
)
print(response)
```

然后，您可以使用此 API 提取规则集：

```
print(response['Ruleset'])
```

**要列出表的所有数据质量规则集，请执行以下操作：**

```
response = client.list_data_quality_rulesets()
```

您可以使用 API 中的筛选条件来筛选附加到特定数据库或表的所有规则集：

```
response = client.list_data_quality_rulesets(
    Filter={
        'TargetTable': {
            'TableName': '<table_name>',
            'DatabaseName': '<database_name>'
        }
    },
)
```

**要更新数据质量规则集，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def update_data_quality_ruleset(self, ruleset_name, ruleset_string):
        """
        Update an AWS Glue Data Quality Ruleset
        
        :param ruleset_name: The name of the AWS Glue Data Quality ruleset to update
        :param ruleset_string: The DQDL ruleset string to update the ruleset with

        """
        try:
            response = self.client.update_data_quality_ruleset(
                Name=ruleset_name,
                Ruleset=ruleset_string
            )
        except ClientError as err:
            logger.error(
                "Couldn't update the AWS Glue Data Quality ruleset. Here's why: %s: %s", 
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response
```

**要删除数据质量规则集，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def delete_data_quality_ruleset(self, ruleset_name):
        """
        Delete a AWS Glue Data Quality Ruleset
        
        :param ruleset_name: The name of the AWS Glue Data Quality ruleset to delete

        """
        try:
            response = self.client.delete_data_quality_ruleset(
                Name=ruleset_name
            )
        except ClientError as err:
            logger.error(
                "Couldn't delete the AWS Glue Data Quality ruleset. Here's why: %s: %s", 
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response
```

## 使用 AWS Glue Data Quality 运行
<a name="using-apis-runs"></a>

**要启动 AWS Glue Data Quality 运行，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list):
        """
        Start an AWS Glue Data Quality evaluation run
        
        :param database_name: The name of the AWS Glue database which contains the dataset.
        :param table_name: The name of the AWS Glue table against which we want to evaluate.
        :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. 
        :param ruleset_list: The list of AWS Glue Data Quality ruleset names to evaluate.

        """
        try:
            response = client.start_data_quality_ruleset_evaluation_run(
                DataSource={
                    'GlueTable': {
                        'DatabaseName': database_name,
                        'TableName': table_name
                    }
                },
                Role=role_name,
                RulesetNames=ruleset_list
            )
        except ClientError as err:
            logger.error(
                "Couldn't start the AWS Glue Data Quality Run. Here's why: %s: %s", 
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response['RunId']
```

请记住，您可以传递 `pushDownPredicate` 或 `catalogPartitionPredicate` 参数，以确保您的数据质量运行仅针对目录表中的一组特定分区。例如：

```
response = client.start_data_quality_ruleset_evaluation_run(
    DataSource={
        'GlueTable': {
            'DatabaseName': '<database_name>',
            'TableName': '<table_name>',
            'AdditionalOptions': {
                'pushDownPredicate': 'year=2023'
            }
        }
    },
    Role='<role_name>',
    NumberOfWorkers=5,
    Timeout=123,
    AdditionalRunOptions={
        'CloudWatchMetricsEnabled': False
    },
    RulesetNames=[
        '<ruleset_name>',
    ]
)
```

 您还可以在“行”级或“列”级配置如何评估规则集中的复合规则。有关复合规则工作原理的更多信息，请参阅文档中的[复合规则的工作原理](dqdl.md#dqdl-syntax-composite-rules)。

 关于如何在请求中设置复合规则评估方法的示例：

```
response = client.start_data_quality_ruleset_evaluation_run(
    DataSource={
        'GlueTable': {
            'DatabaseName': '<database_name>',
            'TableName': '<table_name>',
            'AdditionalOptions': {
                'pushDownPredicate': 'year=2023'
            }
        }
    },
    Role='<role_name>',
    NumberOfWorkers=5,
    Timeout=123,
    AdditionalRunOptions={
        'CompositeRuleEvaluationMethod':ROW
    },
    RulesetNames=[
        '<ruleset_name>',
    ]
)
```

 **要获取有关 AWS Glue Data Quality 运行的信息，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def get_data_quality_ruleset_evaluation_run(self, run_id):
        """
        Get details about an AWS Glue Data Quality Run
        
        :param run_id: The AWS Glue Data Quality run ID to look up

        """
        try:
            response = self.client.get_data_quality_ruleset_evaluation_run(
                RunId=run_id
            )
        except ClientError as err:
            logger.error(
                "Couldn't look up the AWS Glue Data Quality run ID. Here's why: %s: %s", 
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response
```

**要获取 AWS Glue Data Quality 运行的结果，请执行以下操作：**

对于给定的 AWS Glue Data Quality 运行，您可以使用以下方法提取运行的评估结果：

```
response = client.get_data_quality_ruleset_evaluation_run(
    RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx'
)

resultID = response['ResultIds'][0]

response = client.get_data_quality_result(
    ResultId=resultID
)

print(response['RuleResults'])
```

**要列出所有的 AWS Glue Data Quality 运行，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name):
        """
        Lists all the AWS Glue Data Quality runs against a given table
        
        :param database_name: The name of the database where the data quality runs 
        :param table_name: The name of the table against which the data quality runs were created
        
        """
        try:
            response = self.client.list_data_quality_ruleset_evaluation_runs(
                Filter={
                    'DataSource': {
                        'GlueTable': {
                            'DatabaseName': database_name,
                            'TableName': table_name
                        }
                    }
                }
            )
        except ClientError as err:
            logger.error(
                "Couldn't list the AWS Glue Quality runs. Here's why: %s: %s", 
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response
```

您可以修改筛选器子句，使其仅显示特定时间之间的结果或针对特定表运行的结果。

**要停止正在进行的 AWS Glue Data Quality 运行，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def cancel_data_quality_ruleset_evaluation_run(self, result_id):
        """
        Cancels a given AWS Glue Data Quality run
        
        :param result_id: The result id of a AWS Glue Data Quality run to cancel
        
        """
        try:
            response = self.client.cancel_data_quality_ruleset_evaluation_run(
                ResultId=result_id
            )
        except ClientError as err:
            logger.error(
                "Couldn't cancel the AWS Glue Data Quality run. Here's why: %s: %s", 
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response
```

## 处理 AWS Glue Data Quality 结果
<a name="using-apis-results"></a>

 **要获取 AWS Glue Data Quality 运行结果，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def get_data_quality_result(self, result_id):
        """
        Outputs the result of an AWS Glue Data Quality Result
        
        :param result_id: The result id of an AWS Glue Data Quality run
        
        """
        try:
            response = self.client.get_data_quality_result(
                ResultId=result_id
            )
        except ClientError as err:
            logger.error(
                "Couldn't get the AWS Glue Data Quality result. Here's why: %s: %s", 
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
        else:
            return response
```

 **要查看为给定数据质量结果收集的统计信息，请执行以下操作：**

```
import boto3
from botocore.exceptions import ClientError
import logging

logger = logging.getLogger(__name__)
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client
        
    def get_profile_for_data_quality_result(self, result_id):
        """
        Outputs the statistic profile for a AWS Glue Data Quality Result
        
        :param result_id: The result id of a AWS Glue Data Quality run
        
        """
        try:
            response = self.glue_client.get_data_quality_result(
                ResultId=result_id
            )
            
            # the profile contains all statistics gathered for the result
            profile_id = response['ProfileId']
            profile = self.glue_client.list_data_quality_statistics(
                ProfileId = profile_id
            )            
            return profile                        
        except ClientError as err:
            logger.error(
                "Couldn't retrieve Data Quality profile. Here's why: %s: %s", 
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
```

 **要查看在多次数据质量运行中收集的统计信息的时间序列，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client

    def get_statistics_for_data_quality_result(self, profile_id):
        """
        Outputs an array of datapoints for each statistic in the input result.

        :param result_id: The profile id of a AWS Glue Data Quality run

        """
        try:
            profile = self.glue_client.list_data_quality_statistics(
                ProfileId = profile_id
            )
            statistics = [self.glue_client.list_data_quality_statistics(
                StatisticId = s['StatisticId']
            ) for s in profile['Statistics']]
            return statistics
        except ClientError as err:
            logger.error(
                "Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
```

 **要查看特定统计信息的异常检测模型，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client

    def get_model_training_result_for_statistic(self, statistic_id, profile_id):
        """
        Outputs the details (bounds) of anomaly detection training for the given statistic at the given profile.

        :param statistic_id the model's statistic (the timeseries it is tracking)
        :param profile_id the profile associated with the model (a point in the timeseries)

        """
        try:
            model = self.glue_client.get_data_quality_model_result(
                ProfileId = profile_id, StatisticId = statistic_id
            )
            return model
        except ClientError as err:
            logger.error(
                "Couldn't retrieve Data Quality model results. Here's why: %s: %s",
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
```

 **要将某个数据点从其统计模型的异常检测基线中排除，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client

    def apply_exclusions_to_statistic(self, statistic_id, profile_ids):
        """
        Annotate some points along a given statistic timeseries.
        
        This example excludes the provided values; INCLUDE can also be used to undo this action.

        :param statistic_id the statistic timeseries to annotate
        :param profile_id the profiles we want to exclude (points in the timeseries)

        """

        try:
            response = self.glue_client.batch_put_data_quality_statistic_annotation(
                    InclusionAnnotations = [
                        {'ProfileId': prof_id, 
                        'StatisticId': statistic_id, 
                        'InclusionAnnotation': 'EXCLUDE'} for prof_id in profile_ids
                    ]
            )
            return response['FailedInclusionAnnotations']
        except ClientError as err:
            logger.error(
                "Couldn't store Data Quality annotations. Here's why: %s: %s",
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
```

 **要查看特定统计信息的异常检测模型训练状态，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client

    def get_model_training_status_for_statistic(self, statistic_id, profile_id):
        """
        Outputs the status of anomaly detection training for the given statistic at the given profile.

        :param statistic_id the model's statistic (the timeseries it is tracking)
        :param profile_id the profile associated with the model (a point in the timeseries)

        """
        try:
            model = self.glue_client.get_data_quality_model(
                ProfileId = profile_id, StatisticId = statistic_id
            )
            return model
        except ClientError as err:
            logger.error(
                "Couldn't retrieve Data Quality statistics. Here's why: %s: %s",
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
```

 **要将特定数据质量运行的所有结果从异常检测基线中排除，请执行以下操作：**

```
class GlueWrapper:
    """Encapsulates AWS Glue actions."""
    def __init__(self, glue_client):
        """
        :param glue_client: A Boto3 AWS Glue client.
        """
        self.glue_client = glue_client

    def apply_exclusions_to_profile(self, profile_id):
        """
        Exclude datapoints produced by a run across statistic timeseries.

        This example excludes the provided values; INCLUDE can also be used to undo this action.

        :param profile_id the profiles we want to exclude (points in the timeseries)

        """
        try:
            response = self.glue_client.put_data_quality_profile_annotation(
                    ProfileId = profile_id,
                    InclusionAnnotation = "EXCLUDE"
            )
            return response
        except ClientError as err:
            logger.error(
                "Couldn't store Data Quality annotations. Here's why: %s: %s",
                err.response['Error']['Code'], err.response['Error']['Message'])
            raise
```

 **要获取给定数据质量运行的结果并显示这些结果，请执行以下操作：**

 有了 AWS Glue 数据质量自动监测功能 `runID`，您就可以提取 `resultID`，然后得到实际结果，如下所示：

```
response = client.get_data_quality_ruleset_evaluation_run(
    RunId='dqrun-abca77ee126abe1378c1da1ae0750d7dxxxx'
)

resultID = response['ResultIds'][0]

response = client.get_data_quality_result(
    ResultId=resultID
)

print(resp['RuleResults'])
```