

# Send CloudWatch Logs to Firehose


CloudWatch Logs events can be sent to Firehose using CloudWatch subscription filters. For more information, see [Subscription filters with Amazon Data Firehose](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#FirehoseExample).

CloudWatch Logs events are sent to Firehose in compressed gzip format. If you want to deliver decompressed log events to Firehose destinations, you can use the decompression feature in Firehose to automatically decompress CloudWatch Logs. 

**Important**  
Currently, Firehose does not support the delivery of CloudWatch Logs to Amazon OpenSearch Service destination because Amazon CloudWatch combines multiple log events into one Firehose record and Amazon OpenSearch Service cannot accept multiple log events in one record. As an alternative, you can consider [Using subscription filter for Amazon OpenSearch Service in CloudWatch Logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_OpenSearch_Stream.html).

# Decompress CloudWatch Logs


If you are using Firehose to deliver CloudWatch Logs and want to deliver decompressed data to your Firehose stream destination, use Firehose [Data Format Conversion](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) (Parquet, ORC) or [Dynamic partitioning](https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html). You must enable decompression for your Firehose stream.

You can enable decompression using the AWS Management Console, AWS Command Line Interface or AWS SDKs.

**Note**  
If you enable the decompression feature on a stream, use that stream exclusively for CloudWatch Logs subscriptions filters, and not for Vended Logs. If you enable the decompression feature on a stream that is used to ingest both CloudWatch Logs and Vended Logs, the Vended Logs ingestion to Firehose fails. This decompression feature is only for CloudWatch Logs.

# Extract message after decompression of CloudWatch Logs


When you enable decompression, you have the option to also enable message extraction. When using message extraction, Firehose filters out all metadata, such as owner, loggroup, logstream, and others from the decompressed CloudWatch Logs records and delivers only the content inside the message fields. If you are delivering data to a Splunk destination, you must turn on message extraction for Splunk to parse the data. Following are sample outputs after decompression with and without message extraction.

Fig 1: Sample output after decompression without message extraction:

```
{
 "owner": "111111111111",
 "logGroup": "CloudTrail/logs",
 "logStream": "111111111111_CloudTrail/logs_us-east-1",
 "subscriptionFilters": [
 "Destination"
 ],
 "messageType": "DATA_MESSAGE",
 "logEvents": [
 {
 "id": "31953106606966983378809025079804211143289615424298221568",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root1\"}"
 },
 {
 "id": "31953106606966983378809025079804211143289615424298221569",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root2\"}"
 },
 {
 "id": "31953106606966983378809025079804211143289615424298221570",
 "timestamp": 1432826855000,
 "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root3\"}"
 }
 ]
}
```

Fig 2: Sample output after decompression with message extraction:

```
{"eventVersion":"1.03","userIdentity":{"type":"Root1"}
{"eventVersion":"1.03","userIdentity":{"type":"Root2"}
{"eventVersion":"1.03","userIdentity":{"type":"Root3"}
```

# Enable decompression on a new Firehose stream from console


**To enable decompression on a new Firehose stream using the AWS Management Console**

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Amazon Data Firehose** in the navigation pane.

1. Choose **Create Firehose stream**.

1. Under **Choose source and destination**  
****Source****  
The source of your Firehose stream. Choose one of the following sources:  
   + **Direct PUT** – Choose this option to create a Firehose stream that producer applications write to directly. For a list of AWS services and agents and open source services that are integrated with Direct PUT in Firehose, see [this](create-name.md) section.
   + **Kinesis stream:** Choose this option to configure a Firehose stream that uses a Kinesis data stream as a data source. You can then use Firehose to read data easily from an existing Kinesis data stream and load it into destinations. For more information, see [Writing to Firehose Using Kinesis Data Streams](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html)  
****Destination****  
The destination of your Firehose stream. Choose one of the following:  
   + Amazon S3
   + Splunk

1. Under **Firehose stream name**, enter a name for your stream.

1. (Optional) Under **Transform records**:
   + In the **Decompress source records from Amazon CloudWatch Logs** section, choose **Turn on decompression**.
   + If you want to use message extraction after decompression, choose **Turn on message extraction**.

# Enable decompression on an existing Firehose stream


This section provides instructions for enabling decompression on existing Firehose streams. It covers two scenarios – streams with Lambda processing disabled and streams with Lambda processing already enabled. The following sections outline step-by-step procedures for each case, including the creation or modification of Lambda functions, updating Firehose settings, and monitoring CloudWatch metrics to ensure successful implementation of the built-in Firehose decompression feature.

## Enabling decompression when Lambda processing is disabled


To enable decompression on an existing Firehose stream with Lambda processing disabled, you must first enable Lambda processing. This condition is only valid for existing streams. Following steps show how to enable decompression on existing streams that do not have Lambda processing enabled.

1. Create a Lambda function. You can either create a dummy record pass through or can use this [blueprint](https://github.com/aws-samples/aws-kinesis-firehose-resources/tree/main/blueprints/kinesis-firehose-cloudwatch-logs-processor) to create a new Lambda function. 

1. Update your current Firehose stream to enable Lambda processing and use the Lambda function that you created for processing.

1. Once you update the stream with new Lambda function, go back to Firehose console and enable decompression.

1. Disable the Lambda processing that you enabled in step 1. You can now delete the function that you created in step 1.

## Enabling decompression when Lambda processing is enabled


If you already have a Firehose stream with a Lambda function, to perform decompression you can replace it with the Firehose decompression feature. Before you proceed, review your Lambda function code to confirm that it only performs decompression or message extraction. The output of your Lambda function should look similar to the examples shown in [Fig 1 or Fig 2](Message_extraction.md). If the output looks similar, you can replace the Lambda function using the following steps.

1. Replace your current Lambda function with this [blueprint](https://github.com/aws-samples/aws-kinesis-firehose-resources/tree/main/blueprints/kinesis-firehose-cloudwatch-logs-processor). The new blueprint Lambda function automatically detects whether the incoming data is compressed or decompressed. It only performs decompression if its input data is compressed.

1. Turn on decompression using the built-in Firehose option for decompression.

1. Enable CloudWatch metrics for your Firehose stream if it's not already enabled. Monitor the metric `CloudWatchProcessorLambda_IncomingCompressedData` and wait until this metric changes to zero. This confirms that all input data sent to your Lambda function is decompressed and the Lambda function is no longer required.

1. Remove the Lambda data transformation because you no longer need it to decompress your stream.

# Disable decompression on Firehose stream


****

To disable decompression on a data stream using the AWS Management Console

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Amazon Data Firehose** in the navigation pane.

1. Choose the Firehose stream you wish to edit.

1. On **Firehose stream details** page, choose the **Configuration** tab.

1. In the **Transform and convert records** section, choose **Edit**.

1. Under **Decompress source records from Amazon CloudWatch Logs**, clear **Turn on decompression** and then choose **Save changes**.

# Troubleshoot decompression in Firehose


The following table shows how Firehose handles errors during data decompression and processing, including delivering records to an error S3 bucket, logging errors, and emitting metrics. It also explains the error message returned for unauthorized data put operations.


| Issue | Solution | 
| --- | --- | 
| What happens to the source data in case of an error during decompression? |  If Amazon Data Firehose is not able to decompress the record, the record is delivered as is (in compressed format) to error S3 bucket you specified during Firehose stream creation time. Along with the record, the delivered object also includes error code and error message and these objects will be delivered to an S3 bucket prefix called `decompression-failed`. Firehose will continue to process other records after a failed decompression of a record.  | 
| What happens to the source data in case of an error in the processing pipeline after successful decompression? |  If Amazon Data Firehose errors out in the processing steps after decompression like Dynamic Partitioning and Data Format Conversion, the record is delivered in compressed format to the error S3 bucket you specified during Firehose stream creation time. Along with the record, the delivered object also includes error code and error message.  | 
| How are you informed in case of an error or an exception? |  In case of an error or an exception during decompression, if you configure CloudWatch Logs, Firehose will log error messages into CloudWatch Logs. Additionally, Firehose sends metrics to CloudWatch metrics that you can monitor. You can also optionally create alarms based on metrics emitted by Firehose.  | 
| What happens when put operations don't come from CloudWatch Logs? | When customer puts do not come from CloudWatch Logs, then the following error message is returned: <pre>Put to Firehose failed for AccountId: <accountID>, FirehoseName:  <firehosename> because the request is not originating from allowed source types.</pre> | 
| What metrics does Firehose emit for the decompression feature? | Firehose emits metrics for decompression of every record. You should select the period (1 min), statistic (sum), date range to get the number of DecompressedRecords failed or succeeded or DecompressedBytes failed or succeeded. For more information, see [CloudWatch Logs Decompression Metrics](monitoring-with-cloudwatch-metrics.md#decompression-metrics-cw). | 