Using the CloudTrail Processing Library
The CloudTrail Processing Library is a Java library that provides an easy way to process AWS CloudTrail logs. You provide configuration details about your CloudTrail SQS queue and write code to process events. The CloudTrail Processing Library does the rest. It polls your Amazon SQS queue, reads and parses queue messages, downloads CloudTrail log files, parses events in the log files, and passes the events to your code as Java objects.
The CloudTrail Processing Library is highly scalable and fault-tolerant. It handles parallel processing of log files so that you can process as many logs as needed. It handles network failures related to network timeouts and inaccessible resources.
The following topic shows you how to use the CloudTrail Processing Library to process CloudTrail logs in your Java projects.
The library is provided as an Apache-licensed open-source project, available on GitHub:
https://github.com/aws/aws-cloudtrail-processing-library
Minimum requirements
To use the CloudTrail Processing Library, you must have the following:
Processing CloudTrail logs
To process CloudTrail logs in your Java application:
Adding the CloudTrail Processing Library to your project
To use the CloudTrail Processing Library, add it to your Java project's classpath.
Contents
Adding the library to an Apache Ant project
To add the CloudTrail Processing Library to an Apache Ant project
-
Download or clone the CloudTrail Processing Library source code from GitHub:
-
Build the .jar file from source as described in the README
: mvn clean install -Dgpg.skip=true -
Copy the resulting .jar file into your project and add it to your project's
build.xmlfile. For example:<classpath> <pathelement path="${classpath}"/> <pathelement location="lib/aws-cloudtrail-processing-library-1.6.1.jar"/> </classpath>
Adding the library to an Apache Maven project
The CloudTrail Processing Library is available for Apache Mavenpom.xml file.
To add the CloudTrail Processing Library to a Maven project
-
Open your Maven project's
pom.xmlfile and add the following dependency:<dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-cloudtrail-processing-library</artifactId> <version>1.6.1</version> </dependency>
Adding the library to an Eclipse project
To add the CloudTrail Processing Library to an Eclipse project
-
Download or clone the CloudTrail Processing Library source code from GitHub:
-
Build the .jar file from source as described in the README
: mvn clean install -Dgpg.skip=true -
Copy the built aws-cloudtrail-processing-library-1.6.1.jar to a directory in your project (typically
lib). -
Right-click your project's name in the Eclipse Project Explorer, choose Build Path, and then choose Configure
-
In the Java Build Path window, choose the Libraries tab.
-
Choose Add JARs... and navigate to the path where you copied aws-cloudtrail-processing-library-1.6.1.jar.
-
Choose OK to complete adding the
.jarto your project.
Adding the library to an IntelliJ project
To add the CloudTrail Processing Library to an IntelliJ project
-
Download or clone the CloudTrail Processing Library source code from GitHub:
-
Build the .jar file from source as described in the README
: mvn clean install -Dgpg.skip=true -
From File, choose Project Structure.
-
Choose Modules and then choose Dependencies.
-
Choose + JARS or Directories and then go to the path where you built the
aws-cloudtrail-processing-library-1.6.1.jar. -
Choose Apply and then choose OK to complete adding the
.jarto your project.
Configuring the CloudTrail Processing Library
You can configure the CloudTrail Processing Library by creating a classpath properties file that is loaded
at runtime, or by creating a ClientConfiguration object and setting options
manually.
Providing a properties file
You can write a classpath properties file that provides configuration options to your application. The following example file shows the options you can set:
# AWS access key. (Required) accessKey = your_access_key # AWS secret key. (Required) secretKey = your_secret_key # The SQS URL used to pull CloudTrail notification from. (Required) sqsUrl = your_sqs_queue_url # The SQS end point specific to a region. sqsRegion = us-east-1 # A period of time during which Amazon SQS prevents other consuming components # from receiving and processing that message. visibilityTimeout = 60 # The S3 region to use. s3Region = us-east-1 # Number of threads used to download S3 files in parallel. Callbacks can be # invoked from any thread. threadCount = 1 # The time allowed, in seconds, for threads to shut down after # AWSCloudTrailEventProcessingExecutor.stop() is called. If they are still # running beyond this time, they will be forcibly terminated. threadTerminationDelaySeconds = 60 # The maximum number of AWSCloudTrailClientEvents sent to a single invocation # of processEvents(). maxEventsPerEmit = 10 # Whether to include raw event information in CloudTrailDeliveryInfo. enableRawEventInfo = false # Whether to delete SQS message when the CloudTrail Processing Library is unable to process the notification. deleteMessageUponFailure = false
The following parameters are required:
-
sqsUrl– Provides the URL from which to pull your CloudTrail notifications. If you don't specify this value, theAWSCloudTrailProcessingExecutorthrows anIllegalStateException. -
accessKey– A unique identifier for your account, such as AKIAIOSFODNN7EXAMPLE. -
secretKey– A unique identifier for your account, such as wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY.
The accessKey and secretKey parameters provide your AWS
credentials to the library so the library can access AWS on your behalf.
Defaults for the other parameters are set by the library. For more information, see the AWS CloudTrail Processing Library Reference.
Creating a ClientConfiguration
Instead of setting options in the classpath properties, you can provide options to the
AWSCloudTrailProcessingExecutor by initializing and setting options on a
ClientConfiguration object, as shown in the following example:
ClientConfiguration basicConfig = new ClientConfiguration( "http://sqs.us-east-1.amazonaws.com/123456789012/queue2", new DefaultAWSCredentialsProviderChain()); basicConfig.setEnableRawEventInfo(true); basicConfig.setThreadCount(4); basicConfig.setnEventsPerEmit(20);
Implementing the events processor
To process CloudTrail logs, you must implement an EventsProcessor that receives
the CloudTrail log data. The following is an example implementation:
public class SampleEventsProcessor implements EventsProcessor { public void process(List<CloudTrailEvent> events) { int i = 0; for (CloudTrailEvent event : events) { System.out.println(String.format("Process event %d : %s", i++, event.getEventData())); } } }
When implementing an EventsProcessor, you implement the
process() callback that the AWSCloudTrailProcessingExecutor uses
to send you CloudTrail events. Events are provided in a list of CloudTrailClientEvent
objects.
The CloudTrailClientEvent object provides a CloudTrailEvent
and CloudTrailEventMetadata that you can use to read the CloudTrail event and
delivery information.
This simple example prints the event information for each event passed to
SampleEventsProcessor. In your own implementation, you can process logs as
you see fit. The AWSCloudTrailProcessingExecutor continues to send events to
your EventsProcessor as long as it has events to send and is still
running.
Instantiating and running the processing executor
After you write an EventsProcessor and set configuration values for the
CloudTrail Processing Library (either in a properties file or by using the ClientConfiguration
class), you can use these elements to initialize and use an
AWSCloudTrailProcessingExecutor.
To use AWSCloudTrailProcessingExecutor to process CloudTrail events
-
Instantiate an
AWSCloudTrailProcessingExecutor.Builderobject.Builder's constructor takes anEventsProcessorobject and a classpath properties file name. -
Call the
Builder'sbuild()factory method to configure and obtain anAWSCloudTrailProcessingExecutorobject. -
Use the
AWSCloudTrailProcessingExecutor'sstart()andstop()methods to begin and end CloudTrail event processing.
public class SampleApp { public static void main(String[] args) throws InterruptedException { AWSCloudTrailProcessingExecutor executor = new AWSCloudTrailProcessingExecutor.Builder(new SampleEventsProcessor(), "/myproject/cloudtrailprocessing.properties").build(); executor.start(); Thread.sleep(24 * 60 * 60 * 1000); // let it run for a while (optional) executor.stop(); // optional } }
Advanced topics
Filtering the events to process
By default, all logs in your Amazon SQS queue's S3 bucket and all events that they contain
are sent to your EventsProcessor. The CloudTrail Processing Library provides optional interfaces
that you can implement to filter the sources used to obtain CloudTrail logs and to filter the
events that you are interested in processing.
SourceFilter-
You can implement the
SourceFilterinterface to choose whether you want to process logs from a provided source.SourceFilterdeclares a single callback method,filterSource(), that receives aCloudTrailSourceobject. To keep events from a source from being processed, returnfalsefromfilterSource().The CloudTrail Processing Library calls the
filterSource()method after the library polls for logs on the Amazon SQS queue. This occurs before the library starts event filtering or processing for the logs.The following is an example implementation:
public class SampleSourceFilter implements SourceFilter{ private static final int MAX_RECEIVED_COUNT = 3; private static List<String> accountIDs ; static { accountIDs = new ArrayList<>(); accountIDs.add("123456789012"); accountIDs.add("234567890123"); } @Override public boolean filterSource(CloudTrailSource source) throws CallbackException { source = (SQSBasedSource) source; Map<String, String> sourceAttributes = source.getSourceAttributes(); String accountId = sourceAttributes.get( SourceAttributeKeys.ACCOUNT_ID.getAttributeKey()); String receivedCount = sourceAttributes.get( SourceAttributeKeys.APPROXIMATE_RECEIVE_COUNT.getAttributeKey()); int approximateReceivedCount = Integer.parseInt(receivedCount); return approximateReceivedCount <= MAX_RECEIVED_COUNT && accountIDs.contains(accountId); } }If you don't provide your own
SourceFilter, thenDefaultSourceFilteris used, which allows all sources to be processed (it always returnstrue). EventFilter-
You can implement the
EventFilterinterface to choose whether a CloudTrail event is sent to yourEventsProcessor.EventFilterdeclares a single callback method,filterEvent(), that receives aCloudTrailEventobject. To keep the event from being processed, returnfalsefromfilterEvent().The CloudTrail Processing Library calls the
filterEvent()method after the library polls for logs on the Amazon SQS queue and after source filtering. This occurs before the library starts event processing for the logs.See the following example implementation:
public class SampleEventFilter implements EventFilter{ private static final String EC2_EVENTS = "ec2.amazonaws.com"; @Override public boolean filterEvent(CloudTrailClientEvent clientEvent) throws CallbackException { CloudTrailEvent event = clientEvent.getEvent(); String eventSource = event.getEventSource(); String eventName = event.getEventName(); return eventSource.equals(EC2_EVENTS) && eventName.startsWith("Delete"); } }If you don't provide your own
EventFilter, thenDefaultEventFilteris used, which allows all events to be processed (it always returnstrue).
Processing data events
When CloudTrail processes data events, it preserves numbers in their original format, whether
that is an integer (int) or a float (a number that contains a
decimal). In events that have integers in the fields of a data event, CloudTrail historically
processed these numbers as floats. Currently, CloudTrail processes numbers in these fields by
keeping their original format.
As a best practice, to avoid breaking your automations, be flexible in any code or automation that you are using
to process or filter CloudTrail data events, and allow both int and float formatted numbers. For best results,
use version 1.4.0 or higher of the CloudTrail Processing Library.
The following example snippet shows a float formatted number, 2.0, for the desiredCount parameter in the
ResponseParameters block of a data event.
"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2.0 ...
The following example snippet shows an int formatted number, 2, for the desiredCount
parameter in the ResponseParameters block of a data event.
"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2 ...
Reporting progress
Implement the ProgressReporter interface to customize the reporting of
CloudTrail Processing Library progress. ProgressReporter declares two methods:
reportStart() and reportEnd(), which are called at the beginning
and end of the following operations:
-
Polling messages from Amazon SQS
-
Parsing messages from Amazon SQS
-
Processing an Amazon SQS source for CloudTrail logs
-
Deleting messages from Amazon SQS
-
Downloading a CloudTrail log file
-
Processing a CloudTrail log file
Both methods receive a ProgressStatus object that contains information
about the operation that was performed. The progressState member holds a member
of the ProgressState enumeration that identifies the current operation. This
member can contain additional information in the progressInfo member.
Additionally, any object that you return from reportStart() is passed to
reportEnd(), so you can provide contextual information such as the time when
the event began processing.
The following is an example implementation that provides information about how long an operation took to complete:
public class SampleProgressReporter implements ProgressReporter { private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public Object reportStart(ProgressStatus status) { return new Date(); } @Override public void reportEnd(ProgressStatus status, Object startDate) { System.out.println(status.getProgressState().toString() + " is " + status.getProgressInfo().isSuccess() + " , and latency is " + Math.abs(((Date) startDate).getTime()-new Date().getTime()) + " milliseconds."); } }
If you don't implement your own ProgressReporter, then
DefaultExceptionHandler, which prints the name of the state being run, is
used instead.
Handling errors
The ExceptionHandler interface allows you to provide special handling when
an exception occurs during log processing. ExceptionHandler declares a single
callback method, handleException(), which receives a
ProcessingLibraryException object with context about the exception that
occurred.
You can use the passed-in ProcessingLibraryException's
getStatus() method to find out what operation was executed when the exception
occurred and get additional information about the status of the operation.
ProcessingLibraryException is derived from Java's standard
Exception class, so you can also retrieve information about the exception by
invoking any of the exception methods.
See the following example implementation:
public class SampleExceptionHandler implements ExceptionHandler{ private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public void handleException(ProcessingLibraryException exception) { ProgressStatus status = exception.getStatus(); ProgressState state = status.getProgressState(); ProgressInfo info = status.getProgressInfo(); System.err.println(String.format( "Exception. Progress State: %s. Progress Information: %s.", state, info)); } }
If you don't provide your own ExceptionHandler, then
DefaultExceptionHandler, which prints a standard error message, is used
instead.
Note
If the deleteMessageUponFailure parameter is true, the
CloudTrail Processing Library does not distinguish general exceptions from processing errors and may delete
queue messages.
-
For example, you use the
SourceFilterto filter messages by timestamp. -
However, you don't have the required permissions to access the S3 bucket that receives the CloudTrail log files. Because you don't have the required permissions, an
AmazonServiceExceptionis thrown. The CloudTrail Processing Library wraps this in aCallBackException. -
The
DefaultExceptionHandlerlogs this as an error, but does not identify the root cause, which is that you don't have the required permissions. The CloudTrail Processing Library considers this a processing error and deletes the message, even if the message includes a valid CloudTrail log file.
If you want to filter messages with SourceFilter, verify that your
ExceptionHandler can distinguish service exceptions from processing errors.
Additional resources
For more information about the CloudTrail Processing Library, see the following:
-
CloudTrail Processing Library
GitHub project, which includes sample code that demonstrates how to implement a CloudTrail Processing Library application.