

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 處理受限制的呼叫和已刪除的連接
<a name="handling-errors"></a>

如果您超過每秒最大交易數 (TPS)，導致服務限制您的應用程序或連接斷開時，Amazon Textract 操作可能會失敗。例如，如果您在短時間內對 Amazon Textract 操作進行過多調用，則會限制您的呼叫並發送`ProvisionedThroughputExceededException`錯誤的操作響應。如需 Amazon Textract TPS 配額的詳細資訊，請參閲[Amazon Textract 配額](https://docs.aws.amazon.com/general/latest/gr/textract.html)。

您可以通過自動重試操作來管理限制和斷開的連接。您可以指定重試次數，方法是在`Config`參數，當您創建 Amazon Textract 客户端時。我們建議重試計數 5。所以此AWSSDK 在失敗和拋出異常之前按指定次數重試操作。如需詳細資訊，請參閱 [AWS 中的錯誤重試與指數退避](https://docs.aws.amazon.com/general/latest/gr/api-retries.html)。

**注意**  
自動重試適用於同步操作和異步操作。在指定自動重試之前，請確保您擁有最新版本的 AWS 開發工具包。如需詳細資訊，請參閱 [步驟 2：設定AWS CLI和AWS開發套件](setup-awscli-sdk.md)。

以下範例説明如何在您處理多個檔案時自動重試 Amazon Textract 操作。

**先決條件**
+ 如果您尚未：

  1. 使用`AmazonTextractFullAccess`和`AmazonS3ReadOnlyAccess`許可。如需詳細資訊，請參閱 [步驟 1：設定 AWS 帳户並建立 IAM 使用者](setting-up.md#setting-up-iam)。

  1. 安裝並設定 AWS CLI 和 AWS SDK。如需詳細資訊，請參閱 [步驟 2：設定AWS CLI和AWS開發套件](setup-awscli-sdk.md)。

**自動重試操作**

1. 將多個檔案映像上傳至 S3 儲存貯體以運行同步範例。將多頁面檔上傳至 S3 儲存貯體，然後運行`StartDocumentTextDetection`來運行異步示例。

   如需說明，請參閱「」[將數據元上傳至 Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/UploadingObjectsintoAmazonS3.html)中的*Amazon Simple Storage Service 用户指南*。

1. 以下範例演示如何使用`Config`參數自動重試操作。同步示例調用`DetectDocumentText`操作，而異步示例調用`GetDocumentTextDetection`operation.

------
#### [ Sync Example ]

   使用以下範例來調用`DetectDocumentText`操作，以取代您 Amazon S3 儲存貯體中的檔案。In`main`中，變更的值`bucket`至您的 S3 儲存貯體。變更的值`documents`設置為您在步驟 2 中上傳的文檔圖像的名稱。

   ```
   import boto3
   from botocore.client import Config
   # Documents
   
   def process_multiple_documents(bucket, documents):
       
       config = Config(retries = dict(max_attempts = 5))
    
       # Amazon Textract client
       textract = boto3.client('textract', config=config)
    
       for documentName in documents:
    
           print("\nProcessing: {}\n==========================================".format(documentName))
    
           # Call Amazon Textract
           response = textract.detect_document_text(
               Document={
                   'S3Object': {
                       'Bucket': bucket,
                       'Name': documentName
                   }
               })
    
           # Print detected text
           for item in response["Blocks"]:
               if item["BlockType"] == "LINE":
                   print ('\033[94m' +  item["Text"] + '\033[0m')
   
   
   def main():
       bucket = ""
       documents = ["document-image-1.png",
       "document-image-2.png", "document-image-3.png",
       "document-image-4.png", "document-image-5.png" ]
       process_multiple_documents(bucket, documents)
   
   
   
   if __name__ == "__main__":
       main()
   ```

------
#### [ Async Example ]

   使用下列範例來呼叫 `GetDocumentTextDetection` 操作。它假設您已經調用`StartDocumentTextDetection`取代為您 Amazon S3 儲存貯體中的檔案，並獲得`JobId`。In`main`中，變更的值`bucket`變更到您的 S3 儲存貯體中，並將`roleArn`添加到分配給文本角色的 Arn。您還需要變更的值`document`變更為您 Amazon S3 儲存貯體中的多頁面檔名稱。最後，取代`region_name`取代為您所在地區的名稱，並提供`GetResults`函數的名稱`jobId`。

   ```
   import boto3
   from botocore.client import Config
   
   class DocumentProcessor:
       jobId = ''
       region_name = ''
   
       roleArn = ''
       bucket = ''
       document = ''
   
       sqsQueueUrl = ''
       snsTopicArn = ''
       processType = ''
   
       def __init__(self, role, bucket, document, region):
           self.roleArn = role
           self.bucket = bucket
           self.document = document
           self.region_name = region
           self.config = Config(retries = dict(max_attempts = 5))
   
           self.textract = boto3.client('textract', region_name=self.region_name, config=self.config)
           self.sqs = boto3.client('sqs')
           self.sns = boto3.client('sns')
   
   # Display information about a block
       def DisplayBlockInfo(self, block):
   
           print("Block Id: " + block['Id'])
           print("Type: " + block['BlockType'])
           if 'EntityTypes' in block:
               print('EntityTypes: {}'.format(block['EntityTypes']))
   
           if 'Text' in block:
               print("Text: " + block['Text'])
   
           if block['BlockType'] != 'PAGE':
               print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%")
   
           print('Page: {}'.format(block['Page']))
   
           if block['BlockType'] == 'CELL':
               print('Cell Information')
               print('\tColumn: {} '.format(block['ColumnIndex']))
               print('\tRow: {}'.format(block['RowIndex']))
               print('\tColumn span: {} '.format(block['ColumnSpan']))
               print('\tRow span: {}'.format(block['RowSpan']))
   
               if 'Relationships' in block:
                   print('\tRelationships: {}'.format(block['Relationships']))
   
           print('Geometry')
           print('\tBounding Box: {}'.format(block['Geometry']['BoundingBox']))
           print('\tPolygon: {}'.format(block['Geometry']['Polygon']))
   
           if block['BlockType'] == 'SELECTION_ELEMENT':
               print('    Selection element detected: ', end='')
               if block['SelectionStatus'] == 'SELECTED':
                   print('Selected')
               else:
                   print('Not selected')
   
       def GetResults(self, jobId):
           maxResults = 1000
           paginationToken = None
           finished = False
   
           while finished == False:
   
               response = None
   
               if paginationToken == None:
                   response = self.textract.get_document_text_detection(JobId=jobId,
                                                                            MaxResults=maxResults)
               else:
                   response = self.textract.get_document_text_detection(JobId=jobId,
                                                                            MaxResults=maxResults,
                                                                            NextToken=paginationToken)
   
               blocks = response['Blocks']
               print('Detected Document Text')
               print('Pages: {}'.format(response['DocumentMetadata']['Pages']))
   
               # Display block information
               for block in blocks:
                   self.DisplayBlockInfo(block)
                   print()
                   print()
   
               if 'NextToken' in response:
                   paginationToken = response['NextToken']
               else:
                   finished = True
   
   def main():
       roleArn = 'role-arn'
       bucket = 'bucket-name'
       document = 'document-name'
       region_name = 'region-name'
       analyzer = DocumentProcessor(roleArn, bucket, document, region_name)
       analyzer.GetResults("job-id")
   
   if __name__ == "__main__":
       main()
   ```

------