Commit d11fdf95f7f2d8655b95490a00b4043ff7d0523c

Authored by Alex Mukha
1 parent 14ae9c1c43
Exists in master

Added the aggregation service (creates the latest view of data).

Showing 17 changed files with 643 additions and 92 deletions   Show diff stats
aggregation-service/pom.xml
... ... @@ -0,0 +1,65 @@
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<project xmlns="http://maven.apache.org/POM/4.0.0"
  3 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5 + <modelVersion>4.0.0</modelVersion>
  6 + <artifactId>heartbeat-aggregation</artifactId>
  7 + <version>2.0-SNAPSHOT</version>
  8 +
  9 + <parent>
  10 + <artifactId>heartbeat-parent</artifactId>
  11 + <groupId>org.alfresco</groupId>
  12 + <version>2.0-SNAPSHOT</version>
  13 + <relativePath>../pom.xml</relativePath>
  14 + </parent>
  15 +
  16 + <dependencies>
  17 + <dependency>
  18 + <groupId>com.amazonaws</groupId>
  19 + <artifactId>aws-lambda-java-core</artifactId>
  20 + </dependency>
  21 + <dependency>
  22 + <groupId>com.amazonaws</groupId>
  23 + <artifactId>aws-lambda-java-events</artifactId>
  24 + </dependency>
  25 + <dependency>
  26 + <groupId>com.amazonaws</groupId>
  27 + <artifactId>aws-java-sdk-dynamodb</artifactId>
  28 + </dependency>
  29 + <dependency>
  30 + <groupId>com.fasterxml.jackson.core</groupId>
  31 + <artifactId>jackson-core</artifactId>
  32 + </dependency>
  33 + <dependency>
  34 + <groupId>junit</groupId>
  35 + <artifactId>junit</artifactId>
  36 + <scope>test</scope>
  37 + </dependency>
  38 + <dependency>
  39 + <groupId>org.mockito</groupId>
  40 + <artifactId>mockito-all</artifactId>
  41 + <scope>test</scope>
  42 + </dependency>
  43 + </dependencies>
  44 +
  45 + <build>
  46 + <plugins>
  47 + <plugin>
  48 + <groupId>org.apache.maven.plugins</groupId>
  49 + <artifactId>maven-shade-plugin</artifactId>
  50 + <configuration>
  51 + <createDependencyReducedPom>false</createDependencyReducedPom>
  52 + </configuration>
  53 + <executions>
  54 + <execution>
  55 + <phase>package</phase>
  56 + <goals>
  57 + <goal>shade</goal>
  58 + </goals>
  59 + </execution>
  60 + </executions>
  61 + </plugin>
  62 + </plugins>
  63 + </build>
  64 +
  65 +</project>
0 66 \ No newline at end of file
... ...
aggregation-service/src/main/java/org/alfresco/heartbeat/processor/IngestionTableEventProcessor.java
... ... @@ -0,0 +1,33 @@
  1 +package org.alfresco.heartbeat.processor;
  2 +
  3 +import com.amazonaws.services.lambda.runtime.Context;
  4 +import com.amazonaws.services.lambda.runtime.RequestHandler;
  5 +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
  6 +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
  7 +import org.alfresco.heartbeat.service.AggregationService;
  8 +
  9 +/**
  10 + * DynamoDB event lister Lambda. Aggregates the new data to form the latest view.
  11 + *
  12 + * @author amukha
  13 + */
  14 +public class IngestionTableEventProcessor implements RequestHandler<DynamodbEvent, String>
  15 +{
  16 + @Override
  17 + public String handleRequest(DynamodbEvent ddbEvent, Context context)
  18 + {
  19 + AggregationService aggregationService = AggregationService.getInstance();
  20 + for (DynamodbStreamRecord record : ddbEvent.getRecords()){
  21 + try
  22 + {
  23 + aggregationService.aggregateData(record.getDynamodb().getNewImage());
  24 + }
  25 + catch (IllegalArgumentException iae)
  26 + {
  27 + context.getLogger().log("[ERROR] The record [" + record.getEventName() + "]" + record.getEventID() + " is invalid" +
  28 + " and will be skipped: " + record);
  29 + }
  30 + }
  31 + return "Successfully processed " + ddbEvent.getRecords().size() + " records.";
  32 + }
  33 +}
... ...
aggregation-service/src/main/java/org/alfresco/heartbeat/service/AggregationDAO.java
... ... @@ -0,0 +1,62 @@
  1 +package org.alfresco.heartbeat.service;
  2 +
  3 +import com.amazonaws.SDKGlobalConfiguration;
  4 +import com.amazonaws.regions.Region;
  5 +import com.amazonaws.regions.Regions;
  6 +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
  7 +import com.amazonaws.services.dynamodbv2.document.DynamoDB;
  8 +import com.amazonaws.services.dynamodbv2.document.Item;
  9 +import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
  10 +import com.amazonaws.services.dynamodbv2.document.Table;
  11 +
  12 +/**
  13 + * Aggregation DAO layer
  14 + *
  15 + * @author amukha
  16 + */
  17 +public class AggregationDAO
  18 +{
  19 + static final String TABLE_NAME_KEY = "TABLE_NAME";
  20 +
  21 + private Table table;
  22 +
  23 + /**
  24 + * The CF uses environment variable to set the table name to lambda during creation of the stack.
  25 + * This method will return the table name from the environment variable in case of amazon deployment.
  26 + * @return the table name from the environment variable of amazon deployment.
  27 + */
  28 + public String getTableName()
  29 + {
  30 + return System.getenv(TABLE_NAME_KEY);
  31 + }
  32 +
  33 + public AggregationDAO()
  34 + {
  35 + AmazonDynamoDBClient client = new AmazonDynamoDBClient();
  36 + String envRegion = System.getenv(SDKGlobalConfiguration.AWS_REGION_ENV_VAR);
  37 + client.setRegion(Region.getRegion(Regions.fromName(envRegion)));
  38 + DynamoDB dynamoDB = new DynamoDB(client);
  39 + table = dynamoDB.getTable(getTableName());
  40 + }
  41 +
  42 + public AggregationDAO(DynamoDB dynamoDB, String tableName)
  43 + {
  44 + table = dynamoDB.getTable(tableName);
  45 + }
  46 +
  47 + /**
  48 + * The method creates or replaces the existing item in DB
  49 + */
  50 + public PutItemOutcome putAggregatedItem(String repositoryId,
  51 + String feature,
  52 + String version,
  53 + String payload)
  54 + {
  55 + Item item = new Item()
  56 + .withPrimaryKey(AggregationService.REPOSITORY_ID_KEY, repositoryId)
  57 + .withString(AggregationService.FEATURE_KEY, feature)
  58 + .withString(AggregationService.VERSION_KEY, version)
  59 + .withString(AggregationService.PAYLOAD_KEY, payload);
  60 + return table.putItem(item);
  61 + }
  62 +}
... ...
aggregation-service/src/main/java/org/alfresco/heartbeat/service/AggregationService.java
... ... @@ -0,0 +1,76 @@
  1 +package org.alfresco.heartbeat.service;
  2 +
  3 +import com.amazonaws.services.dynamodbv2.model.AttributeValue;
  4 +
  5 +import java.util.Map;
  6 +
  7 +/**
  8 + * Service implementation
  9 + *
  10 + * @author amukha
  11 + */
  12 +public class AggregationService
  13 +{
  14 + public static final String REPOSITORY_ID_KEY = "repositoryId";
  15 + public static final String FEATURE_KEY = "feature";
  16 + public static final String VERSION_KEY = "version";
  17 + public static final String PAYLOAD_KEY = "payload";
  18 +
  19 + private AggregationDAO aggregationDAO;
  20 +
  21 + private static AggregationService instance;
  22 +
  23 + /**
  24 + * Default constructor for testing purposes.
  25 + * {@link AggregationService#getInstance()} should be used instead of this.
  26 + */
  27 + AggregationService()
  28 + {
  29 + }
  30 +
  31 + private AggregationService(AggregationDAO aggregationDAO)
  32 + {
  33 + this.aggregationDAO = aggregationDAO;
  34 + }
  35 +
  36 + public static AggregationService getInstance()
  37 + {
  38 + if (instance == null)
  39 + {
  40 + instance = new AggregationService(new AggregationDAO());
  41 + }
  42 + return instance;
  43 + }
  44 +
  45 + /**
  46 + * Setter for testing purposes
  47 + */
  48 + public void setAggregationDAO(AggregationDAO aggregationDAO)
  49 + {
  50 + this.aggregationDAO = aggregationDAO;
  51 + }
  52 +
  53 + public void aggregateData(Map<String, AttributeValue> newImage) throws IllegalArgumentException
  54 + {
  55 + String repositoryId = newImage.get(REPOSITORY_ID_KEY) == null ? null : newImage.get(REPOSITORY_ID_KEY).getS();
  56 + String feature = newImage.get(FEATURE_KEY) == null ? null : newImage.get(FEATURE_KEY).getS();
  57 + String version = newImage.get(VERSION_KEY) == null ? null : newImage.get(VERSION_KEY).getS();
  58 + String payload = newImage.get(PAYLOAD_KEY) == null ? null : newImage.get(PAYLOAD_KEY).getS();
  59 + if (!isMandatoryDataValid(repositoryId, feature, version, payload))
  60 + {
  61 + throw new IllegalArgumentException("The data does not have all the required parameters: " + newImage);
  62 + }
  63 + aggregationDAO.putAggregatedItem(repositoryId, feature, version, payload);
  64 + }
  65 +
  66 + private boolean isMandatoryDataValid(String repositoryId,
  67 + String feature,
  68 + String version,
  69 + String payload)
  70 + {
  71 + return !(repositoryId == null || repositoryId.isEmpty() ||
  72 + feature == null || feature.isEmpty() ||
  73 + version == null || version.isEmpty() ||
  74 + payload == null || payload.isEmpty());
  75 + }
  76 +}
... ...
aggregation-service/src/test/java/org/alfresco/heartbeat/service/AggregationDAOTest.java
... ... @@ -0,0 +1,133 @@
  1 +package org.alfresco.heartbeat.service;
  2 +
  3 +import com.amazonaws.regions.Region;
  4 +import com.amazonaws.regions.Regions;
  5 +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
  6 +import com.amazonaws.services.dynamodbv2.document.DynamoDB;
  7 +import com.amazonaws.services.dynamodbv2.document.Item;
  8 +import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
  9 +import com.amazonaws.services.dynamodbv2.document.Table;
  10 +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
  11 +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
  12 +import com.amazonaws.services.dynamodbv2.model.KeyType;
  13 +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
  14 +import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
  15 +import org.junit.AfterClass;
  16 +import org.junit.BeforeClass;
  17 +import org.junit.Test;
  18 +
  19 +import java.util.ArrayList;
  20 +import java.util.List;
  21 +
  22 +import static org.junit.Assert.assertEquals;
  23 +import static org.junit.Assert.assertNotNull;
  24 +
  25 +/**
  26 + * Integration tests for {@link AggregationDAO}
  27 + *
  28 + * @author amukha
  29 + */
  30 +public class AggregationDAOTest
  31 +{
  32 + private static DynamoDB dynamoDB;
  33 + private static final String TABLE_NAME = AggregationDAOTest.class.getName() + "-" + System.currentTimeMillis();
  34 +
  35 + @BeforeClass
  36 + public static void beforeClass() throws Exception
  37 + {
  38 + AmazonDynamoDBClient client = new AmazonDynamoDBClient();
  39 + // hardcoded region for tests
  40 + client.setRegion(Region.getRegion(Regions.EU_WEST_1));
  41 + dynamoDB = new DynamoDB(client);
  42 + createTable(TABLE_NAME);
  43 + }
  44 +
  45 + @AfterClass
  46 + public static void afterClass() throws Exception
  47 + {
  48 + Table table = dynamoDB.getTable(TABLE_NAME);
  49 + table.delete();
  50 + table.waitForDelete();
  51 + }
  52 +
  53 +
  54 + private static void createTable(String tableName) throws Exception
  55 + {
  56 + List<KeySchemaElement> keySchema = new ArrayList<>();
  57 + keySchema.add(new KeySchemaElement(AggregationService.REPOSITORY_ID_KEY, KeyType.HASH));
  58 + keySchema.add(new KeySchemaElement(AggregationService.FEATURE_KEY, KeyType.RANGE));
  59 + List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
  60 + attributeDefinitions.add(new AttributeDefinition(AggregationService.REPOSITORY_ID_KEY, ScalarAttributeType.S));
  61 + attributeDefinitions.add(new AttributeDefinition(AggregationService.FEATURE_KEY, ScalarAttributeType.S));
  62 + Table table = dynamoDB.createTable(
  63 + tableName,
  64 + keySchema,
  65 + attributeDefinitions,
  66 + new ProvisionedThroughput(new Long(5), new Long(5)));
  67 + table.waitForActive();
  68 + }
  69 +
  70 + @Test
  71 + public void verifyTableExists()
  72 + {
  73 + Table table = dynamoDB.getTable(TABLE_NAME);
  74 + assertNotNull("The table should not be null.", table);
  75 + }
  76 +
  77 + @Test
  78 + public void putValidItem()
  79 + {
  80 + String repositoryId = String.format(AggregationServiceTest.REPOSITORY_ID, 1);
  81 + String feature = String.format(AggregationServiceTest.FEATURE, 1);
  82 + String version = String.format(AggregationServiceTest.VERSION, 1);
  83 + String payload = String.format(AggregationServiceTest.PAYLOAD, 1);
  84 + AggregationDAO dao = new AggregationDAO(dynamoDB, TABLE_NAME);
  85 + dao.putAggregatedItem(repositoryId, feature, version, payload);
  86 + Table table = dynamoDB.getTable(TABLE_NAME);
  87 + PrimaryKey primaryKey = new PrimaryKey(
  88 + AggregationService.REPOSITORY_ID_KEY,
  89 + repositoryId,
  90 + AggregationService.FEATURE_KEY,
  91 + feature);
  92 + Item item = table.getItem(primaryKey);
  93 + assertNotNull("The item should not be null.", item);
  94 + assertEquals("The values of the saved item should match.",
  95 + repositoryId, item.getString(AggregationService.REPOSITORY_ID_KEY));
  96 + assertEquals("The values of the saved item should match.",
  97 + feature, item.getString(AggregationService.FEATURE_KEY));
  98 + assertEquals("The values of the saved item should match.",
  99 + version, item.getString(AggregationService.VERSION_KEY));
  100 + assertEquals("The values of the saved item should match.",
  101 + payload, item.getString(AggregationService.PAYLOAD_KEY));
  102 + }
  103 +
  104 + @Test
  105 + public void updateItem()
  106 + {
  107 + String repositoryId = String.format(AggregationServiceTest.REPOSITORY_ID, 2);
  108 + String feature = String.format(AggregationServiceTest.FEATURE, 2);
  109 + String version = String.format(AggregationServiceTest.VERSION, 2);
  110 + String payload = String.format(AggregationServiceTest.PAYLOAD, 2);
  111 + AggregationDAO dao = new AggregationDAO(dynamoDB, TABLE_NAME);
  112 + dao.putAggregatedItem(repositoryId, feature, version, payload);
  113 + // change the payload
  114 + payload = String.format(AggregationServiceTest.PAYLOAD, 10);
  115 + dao.putAggregatedItem(repositoryId, feature, version, payload);
  116 + Table table = dynamoDB.getTable(TABLE_NAME);
  117 + PrimaryKey primaryKey = new PrimaryKey(
  118 + AggregationService.REPOSITORY_ID_KEY,
  119 + repositoryId,
  120 + AggregationService.FEATURE_KEY,
  121 + feature);
  122 + Item item = table.getItem(primaryKey);
  123 + assertNotNull("The item should not be null.", item);
  124 + assertEquals("The values of the saved item should match.",
  125 + repositoryId, item.getString(AggregationService.REPOSITORY_ID_KEY));
  126 + assertEquals("The values of the saved item should match.",
  127 + feature, item.getString(AggregationService.FEATURE_KEY));
  128 + assertEquals("The values of the saved item should match.",
  129 + version, item.getString(AggregationService.VERSION_KEY));
  130 + assertEquals("The values of the saved item should match.",
  131 + payload, item.getString(AggregationService.PAYLOAD_KEY));
  132 + }
  133 +}
... ...
aggregation-service/src/test/java/org/alfresco/heartbeat/service/AggregationServiceTest.java
... ... @@ -0,0 +1,85 @@
  1 +package org.alfresco.heartbeat.service;
  2 +
  3 +import com.amazonaws.services.dynamodbv2.model.AttributeValue;
  4 +import org.junit.Before;
  5 +import org.junit.Test;
  6 +
  7 +import java.util.HashMap;
  8 +import java.util.Map;
  9 +
  10 +import static org.junit.Assert.fail;
  11 +import static org.mockito.Matchers.eq;
  12 +import static org.mockito.Mockito.mock;
  13 +import static org.mockito.Mockito.verify;
  14 +
  15 +/**
  16 + * Tests for {@link AggregationService}
  17 + *
  18 + * @author amukha
  19 + */
  20 +public class AggregationServiceTest
  21 +{
  22 + private AggregationDAO dao = mock(AggregationDAO.class);
  23 + private AggregationService aggregationService;
  24 +
  25 + static final String REPOSITORY_ID = "test-repository-id-%d";
  26 + static final String FEATURE = "Alfresco Content Services %d";
  27 + static final String VERSION = "5.2.%d";
  28 + static final String PAYLOAD = "{\"users\":\"%d\",\"documents\":\"1000\"}";
  29 +
  30 + @Before
  31 + public void before()
  32 + {
  33 + aggregationService = new AggregationService();
  34 + aggregationService.setAggregationDAO(dao);
  35 + }
  36 +
  37 + @Test
  38 + public void testPutValidData() throws Exception
  39 + {
  40 + Map<String, AttributeValue> newImage = buildDataImage(1);
  41 + aggregationService.aggregateData(newImage);
  42 + verify(dao).putAggregatedItem(
  43 + eq(newImage.get(AggregationService.REPOSITORY_ID_KEY).getS()),
  44 + eq(newImage.get(AggregationService.FEATURE_KEY).getS()),
  45 + eq(newImage.get(AggregationService.VERSION_KEY).getS()),
  46 + eq(newImage.get(AggregationService.PAYLOAD_KEY).getS()));
  47 + }
  48 +
  49 + @Test
  50 + public void testPutInvalidData() throws Exception
  51 + {
  52 +
  53 + // The payload is missing
  54 + Map<String, AttributeValue> newImage = new HashMap<>();
  55 + newImage.put(AggregationService.REPOSITORY_ID_KEY, new AttributeValue(String.format(REPOSITORY_ID, 1)));
  56 + newImage.put(AggregationService.FEATURE_KEY, new AttributeValue(String.format(FEATURE, 1)));
  57 + newImage.put(AggregationService.VERSION_KEY, new AttributeValue(String.format(VERSION, 1)));
  58 + try
  59 + {
  60 + aggregationService.aggregateData(newImage);
  61 + fail("The IllegalArgumentException should have been thrown.");
  62 + }
  63 + catch (IllegalArgumentException iae)
  64 + {
  65 + // expected
  66 + }
  67 + }
  68 +
  69 + /**
  70 + * @param i unique number to differentiate the data sets based on template strings
  71 + */
  72 + private Map<String, AttributeValue> buildDataImage(int i)
  73 + {
  74 + String repositoryId = String.format(REPOSITORY_ID, i);
  75 + String feature = String.format(FEATURE, i);
  76 + String version = String.format(VERSION, i);
  77 + String payload = String.format(PAYLOAD, i);
  78 + Map<String, AttributeValue> newImage = new HashMap<>();
  79 + newImage.put(AggregationService.REPOSITORY_ID_KEY, new AttributeValue(repositoryId));
  80 + newImage.put(AggregationService.FEATURE_KEY, new AttributeValue(feature));
  81 + newImage.put(AggregationService.VERSION_KEY, new AttributeValue(version));
  82 + newImage.put(AggregationService.PAYLOAD_KEY, new AttributeValue(payload));
  83 + return newImage;
  84 + }
  85 +}
... ...
heartbeat-template.yaml
... ... @@ -9,13 +9,13 @@ Resources:
9 9 Runtime: java8
10 10 MemorySize: 512
11 11 Timeout: 15
12   - CodeUri: ./target/heartbeat-2.0-SNAPSHOT.jar
  12 + CodeUri: ./ingestion-service/target/heartbeat-ingestion-2.0-SNAPSHOT.jar
13 13 Policies:
14 14 - AmazonDynamoDBFullAccess
15 15 Environment:
16 16 Variables:
17 17 TABLE_NAME:
18   - Ref: Table
  18 + Ref: IngestionTable
19 19 Events:
20 20 PostRequest:
21 21 Type: Api
... ... @@ -23,7 +23,7 @@ Resources:
23 23 Path: /
24 24 Method: post
25 25  
26   - Table:
  26 + IngestionTable:
27 27 Type: AWS::DynamoDB::Table
28 28 Properties:
29 29 KeySchema:
... ... @@ -42,7 +42,7 @@ Resources:
42 42 AttributeType: 'S'
43 43 # Declare columns once the secondary indexes are described
44 44 # -
45   -# AttributeName: "uuid"
  45 +# AttributeName: "repositoryId"
46 46 # AttributeType: 'S'
47 47 # -
48 48 # AttributeName: "feature"
... ... @@ -56,3 +56,56 @@ Resources:
56 56 ProvisionedThroughput:
57 57 ReadCapacityUnits: "5"
58 58 WriteCapacityUnits: "5"
  59 + StreamSpecification:
  60 + StreamViewType: "NEW_IMAGE"
  61 +
  62 + ProcessDynamoDBStream:
  63 + Type: AWS::Serverless::Function
  64 + Properties:
  65 + Handler: org.alfresco.heartbeat.processor.IngestionTableEventProcessor
  66 + Runtime: java8
  67 + MemorySize: 512
  68 + Timeout: 15
  69 + CodeUri: ./aggregation-service/target/heartbeat-aggregation-2.0-SNAPSHOT.jar
  70 + Policies:
  71 + - AWSLambdaDynamoDBExecutionRole
  72 + - AmazonDynamoDBFullAccess
  73 + Environment:
  74 + Variables:
  75 + TABLE_NAME:
  76 + Ref: AggregationTable
  77 + Events:
  78 + Stream:
  79 + Type: DynamoDB
  80 + Properties:
  81 + Stream: !GetAtt IngestionTable.StreamArn
  82 + BatchSize: 100
  83 + StartingPosition: TRIM_HORIZON
  84 +
  85 + AggregationTable:
  86 + Type: AWS::DynamoDB::Table
  87 + Properties:
  88 + KeySchema:
  89 + -
  90 + AttributeName: "repositoryId"
  91 + KeyType: "HASH"
  92 + -
  93 + AttributeName: "feature"
  94 + KeyType: "RANGE"
  95 + AttributeDefinitions:
  96 + -
  97 + AttributeName: 'repositoryId'
  98 + AttributeType: 'S'
  99 + -
  100 + AttributeName: 'feature'
  101 + AttributeType: 'S'
  102 +# Declare columns once the secondary indexes are described
  103 +# -
  104 +# AttributeName: "version"
  105 +# AttributeType: 'S'
  106 +# -
  107 +# AttributeName: "payload"
  108 +# AttributeType: 'S'
  109 + ProvisionedThroughput:
  110 + ReadCapacityUnits: "5"
  111 + WriteCapacityUnits: "5"
59 112 \ No newline at end of file
... ...
ingestion-service/pom.xml
... ... @@ -0,0 +1,61 @@
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<project xmlns="http://maven.apache.org/POM/4.0.0"
  3 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5 + <modelVersion>4.0.0</modelVersion>
  6 + <artifactId>heartbeat-ingestion</artifactId>
  7 + <version>2.0-SNAPSHOT</version>
  8 +
  9 + <parent>
  10 + <artifactId>heartbeat-parent</artifactId>
  11 + <groupId>org.alfresco</groupId>
  12 + <version>2.0-SNAPSHOT</version>
  13 + <relativePath>../pom.xml</relativePath>
  14 + </parent>
  15 +
  16 + <dependencies>
  17 + <dependency>
  18 + <groupId>com.amazonaws</groupId>
  19 + <artifactId>aws-lambda-java-core</artifactId>
  20 + </dependency>
  21 + <dependency>
  22 + <groupId>com.amazonaws</groupId>
  23 + <artifactId>aws-java-sdk-dynamodb</artifactId>
  24 + </dependency>
  25 + <dependency>
  26 + <groupId>com.fasterxml.jackson.core</groupId>
  27 + <artifactId>jackson-core</artifactId>
  28 + </dependency>
  29 + <dependency>
  30 + <groupId>junit</groupId>
  31 + <artifactId>junit</artifactId>
  32 + <scope>test</scope>
  33 + </dependency>
  34 + <dependency>
  35 + <groupId>org.mockito</groupId>
  36 + <artifactId>mockito-all</artifactId>
  37 + <scope>test</scope>
  38 + </dependency>
  39 + </dependencies>
  40 +
  41 + <build>
  42 + <plugins>
  43 + <plugin>
  44 + <groupId>org.apache.maven.plugins</groupId>
  45 + <artifactId>maven-shade-plugin</artifactId>
  46 + <configuration>
  47 + <createDependencyReducedPom>false</createDependencyReducedPom>
  48 + </configuration>
  49 + <executions>
  50 + <execution>
  51 + <phase>package</phase>
  52 + <goals>
  53 + <goal>shade</goal>
  54 + </goals>
  55 + </execution>
  56 + </executions>
  57 + </plugin>
  58 + </plugins>
  59 + </build>
  60 +
  61 +</project>
0 62 \ No newline at end of file
... ...
ingestion-service/src/main/java/org/alfresco/heartbeat/handler/HeartbeatRequestHandler.java
1 1 index 3ebd973028912df9aed6d98d8ac4e65749f81d0b..2203dd64e0b78ac824ccc8f5ba25ae9f28d59028 100644
2   --- a/src/main/java/org/alfresco/heartbeat/handler/HeartbeatRequestHandler.java
  2 +++ b/ingestion-service/src/main/java/org/alfresco/heartbeat/handler/HeartbeatRequestHandler.java
... ... @@ -14,17 +14,16 @@ import org.apache.http.HttpStatus;
14 14 */
15 15 public class HeartbeatRequestHandler implements RequestHandler<HeartbeatRequest, HeartbeatResponse>
16 16 {
17   - private HeartbeatService heartbeatService;
18 17  
19 18 @Override
20 19 public HeartbeatResponse handleRequest(HeartbeatRequest request, Context context)
21 20 {
22 21 context.getLogger().log("Received object: " + request);
23   - heartbeatService = HeartbeatService.getInstance();
  22 + HeartbeatService heartbeatService = HeartbeatService.getInstance();
24 23 HeartbeatResponse response = new HeartbeatResponse();
25 24 try
26 25 {
27   - String responseBody = heartbeatService.saveData(request.getBody());
  26 + String responseBody = heartbeatService.ingestData(request.getBody());
28 27 response.setStatusCode(HttpStatus.SC_CREATED);
29 28 response.setBody(responseBody);
30 29 }
... ...
ingestion-service/src/main/java/org/alfresco/heartbeat/http/HeartbeatRequest.java
ingestion-service/src/main/java/org/alfresco/heartbeat/http/HeartbeatResponse.java
ingestion-service/src/main/java/org/alfresco/heartbeat/schema/DynamoDBSchemaCheck.java
ingestion-service/src/main/java/org/alfresco/heartbeat/service/HeartbeatDAO.java
1 1 index 2a2d1c25b7d183dd8906e56a45364ced10370a32..a914f1bb10beba22c64af5f7d3ef8fefae810922 100644
2   --- a/src/main/java/org/alfresco/heartbeat/service/HeartbeatDAO.java
  2 +++ b/ingestion-service/src/main/java/org/alfresco/heartbeat/service/HeartbeatDAO.java
... ... @@ -12,7 +12,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
12 12 import com.fasterxml.jackson.databind.node.ObjectNode;
13 13  
14 14 /**
15   - * DAO layer
  15 + * Ingestion DAO layer
16 16 *
17 17 * @author amukha
18 18 */
... ... @@ -49,29 +49,29 @@ public class HeartbeatDAO
49 49 table = dynamoDB.getTable(tableName);
50 50 }
51 51  
52   - public PutItemOutcome putItem(String uuid,
  52 + public PutItemOutcome putItem(String repositoryId,
53 53 String feature,
54 54 String version,
55 55 String timestamp,
56 56 String payload)
57 57 {
58   - String primaryPartitionKey = createPrimaryPartitionKey(uuid, feature);
  58 + String primaryPartitionKey = createPrimaryPartitionKey(repositoryId, feature);
59 59  
60 60 Item item = new Item()
61 61 .withPrimaryKey(PRIMARY_PARTITION_KEY, primaryPartitionKey)
62 62 .withString(TIMESTAMP_KEY, timestamp)
63   - .withString(HeartbeatService.UUID_KEY, uuid)
  63 + .withString(HeartbeatService.REPOSITORY_ID_KEY, repositoryId)
64 64 .withString(HeartbeatService.FEATURE_KEY, feature)
65 65 .withString(HeartbeatService.VERSION_KEY, version)
66 66 .withString(HeartbeatService.PAYLOAD_KEY, payload);
67 67 return table.putItem(item);
68 68 }
69 69  
70   - public String createPrimaryPartitionKey(String uuid, String feature)
  70 + public String createPrimaryPartitionKey(String repositoryId, String feature)
71 71 {
72 72 ObjectMapper mapper = new ObjectMapper();
73 73 ObjectNode json = mapper.createObjectNode();
74   - json.put(HeartbeatService.UUID_KEY, uuid);
  74 + json.put(HeartbeatService.REPOSITORY_ID_KEY, repositoryId);
75 75 json.put(HeartbeatService.FEATURE_KEY, feature);
76 76 return json.toString();
77 77 }
... ...
ingestion-service/src/main/java/org/alfresco/heartbeat/service/HeartbeatService.java
1 1 index 89daba03924885f3dab371a9c02f4ec46ef2bce7..5d327f035dfb9a05079bfe99791f67379acf4acd 100644
2   --- a/src/main/java/org/alfresco/heartbeat/service/HeartbeatService.java
  2 +++ b/ingestion-service/src/main/java/org/alfresco/heartbeat/service/HeartbeatService.java
... ... @@ -13,7 +13,7 @@ import java.io.IOException;
13 13 */
14 14 public class HeartbeatService
15 15 {
16   - static final String UUID_KEY = "uuid";
  16 + static final String REPOSITORY_ID_KEY = "repositoryId";
17 17 static final String FEATURE_KEY = "feature";
18 18 static final String VERSION_KEY = "version";
19 19 static final String PAYLOAD_KEY = "payload";
... ... @@ -56,22 +56,22 @@ public class HeartbeatService
56 56 * @param requestBody the body of the request as JSON containing all necessary data to save
57 57 * @return response body JSON as String
58 58 */
59   - public String saveData(String requestBody) throws IOException
  59 + public String ingestData(String requestBody) throws IOException
60 60 {
61 61 if (requestBody == null)
62 62 {
63 63 throw new IllegalArgumentException("The provided request body is null.");
64 64 }
65 65 ObjectMapper mapper = new ObjectMapper();
66   - String uuid = null;
  66 + String repositoryId = null;
67 67 String feature = null;
68 68 String version = null;
69 69 String payload = null;
70 70 try
71 71 {
72 72 JsonNode actualObj = mapper.readTree(requestBody);
73   - uuid = actualObj.get(UUID_KEY) == null ?
74   - null : actualObj.get(UUID_KEY).asText();
  73 + repositoryId = actualObj.get(REPOSITORY_ID_KEY) == null ?
  74 + null : actualObj.get(REPOSITORY_ID_KEY).asText();
75 75 feature = actualObj.get(FEATURE_KEY) == null ?
76 76 null : actualObj.get(FEATURE_KEY).asText();
77 77 version = actualObj.get(VERSION_KEY) == null ?
... ... @@ -83,29 +83,28 @@ public class HeartbeatService
83 83 {
84 84 throw ioe;
85 85 }
86   - if (!isMandatoryDataValid(uuid, feature, version, payload))
  86 + if (!isMandatoryDataValid(repositoryId, feature, version, payload))
87 87 {
88 88 throw new IllegalArgumentException("The request body is invalid." +
89 89 "\n Received: " + requestBody +
90 90 "\n Parsed: " +
91   - UUID_KEY + " = " + uuid + ", " +
  91 + REPOSITORY_ID_KEY + " = " + repositoryId + ", " +
92 92 FEATURE_KEY + " = " + feature + ", " +
93 93 VERSION_KEY + " = " + version + ", " +
94 94 PAYLOAD_KEY + " = " + payload);
95 95 }
96   - heartbeatDAO.putItem(uuid, feature, version, Long.toString(System.currentTimeMillis()), payload);
  96 + heartbeatDAO.putItem(repositoryId, feature, version, Long.toString(System.currentTimeMillis()), payload);
97 97 ObjectNode responseNode = mapper.createObjectNode();
98 98 responseNode.put("success", true);
99 99 return responseNode.toString();
100 100 }
101 101  
102   -
103   - private boolean isMandatoryDataValid(String uuid,
  102 + private boolean isMandatoryDataValid(String repositoryId,
104 103 String feature,
105 104 String version,
106 105 String payload)
107 106 {
108   - return !(uuid == null || uuid.isEmpty() ||
  107 + return !(repositoryId == null || repositoryId.isEmpty() ||
109 108 feature == null || feature.isEmpty() ||
110 109 version == null || version.isEmpty() ||
111 110 payload == null || payload.isEmpty());
... ...
ingestion-service/src/test/java/org/alfresco/heartbeat/service/HeartbeatDAOTest.java
1 1 index 8eb30143b8ba4a401794827d28386297e9483357..0c5f3a0d322a6897a8d23f14aec1c0097d0ae2e4 100644
2   --- a/src/test/java/org/alfresco/heartbeat/service/HeartbeatDAOTest.java
  2 +++ b/ingestion-service/src/test/java/org/alfresco/heartbeat/service/HeartbeatDAOTest.java
... ... @@ -80,7 +80,7 @@ public class HeartbeatDAOTest
80 80 HeartbeatDAO dao = new HeartbeatDAO(dynamoDB, TABLE_NAME);
81 81 String timestamp = Long.toString(System.currentTimeMillis());
82 82 dao.putItem(
83   - HeartbeatServiceTest.UUID,
  83 + HeartbeatServiceTest.REPOSITORY_ID,
84 84 HeartbeatServiceTest.FEATURE,
85 85 HeartbeatServiceTest.VERSION,
86 86 timestamp,
... ... @@ -89,7 +89,7 @@ public class HeartbeatDAOTest
89 89 Table table = dynamoDB.getTable(TABLE_NAME);
90 90 PrimaryKey primaryKey = new PrimaryKey(
91 91 HeartbeatDAO.PRIMARY_PARTITION_KEY,
92   - dao.createPrimaryPartitionKey(HeartbeatServiceTest.UUID, HeartbeatServiceTest.FEATURE),
  92 + dao.createPrimaryPartitionKey(HeartbeatServiceTest.REPOSITORY_ID, HeartbeatServiceTest.FEATURE),
93 93 HeartbeatDAO.PRIMARY_SORT_KEY,
94 94 timestamp);
95 95 Item item = table.getItem(primaryKey);
... ... @@ -97,7 +97,7 @@ public class HeartbeatDAOTest
97 97 assertEquals("The values of the saved item should match.",
98 98 timestamp, item.getString(HeartbeatDAO.PRIMARY_SORT_KEY));
99 99 assertEquals("The values of the saved item should match.",
100   - HeartbeatServiceTest.UUID, item.getString(HeartbeatService.UUID_KEY));
  100 + HeartbeatServiceTest.REPOSITORY_ID, item.getString(HeartbeatService.REPOSITORY_ID_KEY));
101 101 assertEquals("The values of the saved item should match.",
102 102 HeartbeatServiceTest.FEATURE, item.getString(HeartbeatService.FEATURE_KEY));
103 103 assertEquals("The values of the saved item should match.",
... ...
ingestion-service/src/test/java/org/alfresco/heartbeat/service/HeartbeatServiceTest.java
1 1 index 3bdd166c205d28f5dcbdebb218c9b7e8b944eb8f..4ab4b3b862390cabe4bda0341d32cdec2eeee4f5 100644
2   --- a/src/test/java/org/alfresco/heartbeat/service/HeartbeatServiceTest.java
  2 +++ b/ingestion-service/src/test/java/org/alfresco/heartbeat/service/HeartbeatServiceTest.java
... ... @@ -6,7 +6,7 @@ import org.mockito.Mockito;
6 6  
7 7 import static org.alfresco.heartbeat.service.HeartbeatService.FEATURE_KEY;
8 8 import static org.alfresco.heartbeat.service.HeartbeatService.PAYLOAD_KEY;
9   -import static org.alfresco.heartbeat.service.HeartbeatService.UUID_KEY;
  9 +import static org.alfresco.heartbeat.service.HeartbeatService.REPOSITORY_ID_KEY;
10 10 import static org.alfresco.heartbeat.service.HeartbeatService.VERSION_KEY;
11 11 import static org.junit.Assert.fail;
12 12 import static org.mockito.Matchers.eq;
... ... @@ -23,13 +23,13 @@ public class HeartbeatServiceTest
23 23 private HeartbeatDAO dao = mock(HeartbeatDAO.class);
24 24 private HeartbeatService heartbeatService;
25 25  
26   - static final String UUID = "test-uuid-1";
  26 + static final String REPOSITORY_ID = "test-repository-id-1";
27 27 static final String FEATURE = "Alfresco Content Services";
28 28 static final String VERSION = "5.2.1";
29 29 static final String PAYLOAD = "{\"users\":\"1000\",\"documents\":\"10000\"}";
30 30 private static final String VALID_INPUT_JSON =
31 31 "{\n" +
32   - "\"" + UUID_KEY + "\" : \"%s\",\n" +
  32 + "\"" + REPOSITORY_ID_KEY + "\" : \"%s\",\n" +
33 33 "\"" + FEATURE_KEY + "\" : \"%s\",\n" +
34 34 "\"" + VERSION_KEY + "\" : \"%s\",\n" +
35 35 "\"" + PAYLOAD_KEY + "\" : %s" +
... ... @@ -38,7 +38,7 @@ public class HeartbeatServiceTest
38 38 /* doesn't have the payload */
39 39 private static final String INVALID_INPUT_JSON =
40 40 "{\n" +
41   - "\"" + UUID_KEY + "\" : \"%s\",\n" +
  41 + "\"" + REPOSITORY_ID_KEY + "\" : \"%s\",\n" +
42 42 "\"" + FEATURE_KEY + "\" : \"%s\",\n" +
43 43 "\"" + VERSION_KEY + "\" : \"%s\"" +
44 44 "}";
... ... @@ -53,8 +53,8 @@ public class HeartbeatServiceTest
53 53 @Test
54 54 public void testSaveWithValidData() throws Exception
55 55 {
56   - heartbeatService.saveData(String.format(VALID_INPUT_JSON, UUID, FEATURE, VERSION, PAYLOAD));
57   - verify(dao).putItem(eq(UUID), eq(FEATURE), eq(VERSION), Mockito.anyString(), eq(PAYLOAD));
  56 + heartbeatService.ingestData(String.format(VALID_INPUT_JSON, REPOSITORY_ID, FEATURE, VERSION, PAYLOAD));
  57 + verify(dao).putItem(eq(REPOSITORY_ID), eq(FEATURE), eq(VERSION), Mockito.anyString(), eq(PAYLOAD));
58 58 }
59 59  
60 60 @Test
... ... @@ -62,7 +62,7 @@ public class HeartbeatServiceTest
62 62 {
63 63 try
64 64 {
65   - heartbeatService.saveData(String.format(INVALID_INPUT_JSON, UUID, FEATURE, VERSION));
  65 + heartbeatService.ingestData(String.format(INVALID_INPUT_JSON, REPOSITORY_ID, FEATURE, VERSION));
66 66 fail("The IllegalArgumentException should have been thrown.");
67 67 }
68 68 catch (IllegalArgumentException iae)
... ...
... ... @@ -10,74 +10,59 @@
10 10 <version>8</version>
11 11 </parent>
12 12  
13   - <artifactId>heartbeat</artifactId>
  13 + <artifactId>heartbeat-parent</artifactId>
14 14 <version>2.0-SNAPSHOT</version>
  15 + <packaging>pom</packaging>
15 16  
16 17 <properties>
17 18 <aws.dynamodb.sdk.version>1.11.69</aws.dynamodb.sdk.version>
18 19 <aws.lambda.core.version>1.1.0</aws.lambda.core.version>
  20 + <aws.lambda.events.version>1.3.0</aws.lambda.events.version>
19 21 <jackson.core.version>2.8.5</jackson.core.version>
20 22 <junit.version>4.12</junit.version>
21 23 <mockito.version>1.10.19</mockito.version>
22 24 </properties>
23 25  
24   - <dependencies>
25   - <dependency>
26   - <groupId>com.amazonaws</groupId>
27   - <artifactId>aws-lambda-java-core</artifactId>
28   - <version>${aws.lambda.core.version}</version>
29   - </dependency>
30   - <dependency>
31   - <groupId>com.amazonaws</groupId>
32   - <artifactId>aws-java-sdk-dynamodb</artifactId>
33   - <version>${aws.dynamodb.sdk.version}</version>
34   - </dependency>
35   - <dependency>
36   - <groupId>com.fasterxml.jackson.core</groupId>
37   - <artifactId>jackson-core</artifactId>
38   - <version>${jackson.core.version}</version>
39   - </dependency>
40   - <dependency>
41   - <groupId>junit</groupId>
42   - <artifactId>junit</artifactId>
43   - <version>${junit.version}</version>
44   - <scope>test</scope>
45   - </dependency>
46   - <dependency>
47   - <groupId>org.mockito</groupId>
48   - <artifactId>mockito-all</artifactId>
49   - <version>${mockito.version}</version>
50   - <scope>test</scope>
51   - </dependency>
52   - </dependencies>
  26 + <dependencyManagement>
  27 + <dependencies>
  28 + <dependency>
  29 + <groupId>com.amazonaws</groupId>
  30 + <artifactId>aws-lambda-java-core</artifactId>
  31 + <version>${aws.lambda.core.version}</version>
  32 + </dependency>
  33 + <dependency>
  34 + <groupId>com.amazonaws</groupId>
  35 + <artifactId>aws-lambda-java-events</artifactId>
  36 + <version>${aws.lambda.events.version}</version>
  37 + </dependency>
  38 + <dependency>
  39 + <groupId>com.amazonaws</groupId>
  40 + <artifactId>aws-java-sdk-dynamodb</artifactId>
  41 + <version>${aws.dynamodb.sdk.version}</version>
  42 + </dependency>
  43 + <dependency>
  44 + <groupId>com.fasterxml.jackson.core</groupId>
  45 + <artifactId>jackson-core</artifactId>
  46 + <version>${jackson.core.version}</version>
  47 + </dependency>
  48 + <dependency>
  49 + <groupId>junit</groupId>
  50 + <artifactId>junit</artifactId>
  51 + <version>${junit.version}</version>
  52 + <scope>test</scope>
  53 + </dependency>
  54 + <dependency>
  55 + <groupId>org.mockito</groupId>
  56 + <artifactId>mockito-all</artifactId>
  57 + <version>${mockito.version}</version>
  58 + <scope>test</scope>
  59 + </dependency>
  60 + </dependencies>
  61 + </dependencyManagement>
53 62  
54   - <!--Custom repository:-->
55   - <repositories>
56   - <repository>
57   - <id>dynamodb-local</id>
58   - <name>DynamoDB Local Release Repository</name>
59   - <url>http://dynamodb-local.s3-website-us-west-2.amazonaws.com/release</url>
60   - </repository>
61   - </repositories>
62   -
63   - <build>
64   - <plugins>
65   - <plugin>
66   - <groupId>org.apache.maven.plugins</groupId>
67   - <artifactId>maven-shade-plugin</artifactId>
68   - <configuration>
69   - <createDependencyReducedPom>false</createDependencyReducedPom>
70   - </configuration>
71   - <executions>
72   - <execution>
73   - <phase>package</phase>
74   - <goals>
75   - <goal>shade</goal>
76   - </goals>
77   - </execution>
78   - </executions>
79   - </plugin>
80   - </plugins>
81   - </build>
  63 + <modules>
  64 + <module>aggregation-service</module>
  65 + <module>ingestion-service</module>
  66 + </modules>
82 67  
83 68 </project>
84 69 \ No newline at end of file
... ...