Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 28 Next »

Binary Data Service

The Binary Data Service is an abstraction that hides the complexity of storing and retrieving binary data using configurable providers.

Uses the concept of sessions to buckets in order to give a logical separation between different scopes. For example, "config" could be a bucket to store all data related to the configurations scope, and it will be represented as a folder in the local file system provider.

@Inject
protected BinaryDataService binaryDataService;

protected BinaryDataServiceSession session;


@PostConstruct
void init() {
  session = binaryDataService.newSession("config");
}


public void storeConfig(String key, InputStream inputStream) {
  session.store(key, inputStream);
}


public Optional<InputStream> findConfig(String key) {
  return session.find(key);
}

Providers

Local File System

The default provider. Uses the local file system, or any mounted drive as the storage:

storage:
  binary:
    provider: local
    dataPath: other

Configuration parameters:

storage.binary.provider

(Optional, String) If given, must be local.

storage.binary.dataPath

(Optional, String) The path where the data will be stored. Default is ${tmp}/binary, where ${tmp} is the default temporary folder of the OS where the application runs.

SFTP

Uses a remote SFTP server as storage:

storage:
  binary:
    provider: sftp
    dataPath: other
    server: sftp://user:pass@localhost:22
    connection:
      keepAliveInterval: 1m
      connectTimeout: 30s
      readTimeout: 30s

Keep in mind that SFTP is known to be slow. Use it only as last resource

Configuration parameters:

storage.binary.provider

(Required, String) Must be sftp.

storage.binary.dataPath

(Optional, String) The path where the data will be stored. Default is /binary.

storage.binary.server

(Optional, String) The URI to the SFTP server. Default is sftp://pdp:pdp@localhost:22.

storage.binary.connection.keepAliveInterval

(Optional, Duration) The interval to send a keep alive signal to the server. Default is 30s.

storage.binary.connection.connectTimeout

(Optional, Duration) The timeout for the connection to the server. Default is 5s.

storage.binary.connection.readTimeout

(Optional, Duration) The timeout for the reading from the server. Default is 3s.

Cache Manager

The local cache is an extension to the one provided by Micronaut and implemented with Caffeine. It simplifies the process of caching entities with the use of annotations and configurations in the application.yml

Configuration

storage:
  cache:
    local:
      myEntity:
        initialCapacity: 10
        maximumSize: 50
        maximumWeight: 100
        expireAfterWrite: 1h
        expireAfterAccess: 5m
        recordStats: true
        testMode: false

For a cache named myEntity with the following properties:

| Property | Type | Default | Description | | ----------------- | :------: | :-----: | ----------- | | initialCapacity | Integer | 16 | The minimum size of the cache | | maximumSize | Long | | The maximum size of the cache. Can not be combined with a weigher | | maximumWeight | Long | | The maximum weight to be allowed for an element in the cache (see Weigher section) | | expireAfterWrite | Duration | | The time to wait to expire an element after its creation | | expireAfterAccess | Duration | 5m | The time to wait to expire an element after the last time it was accessed | | recordStats | boolean | true | To record statistics about hit rates and evictions (see Cache Statistics section) | | testMode | boolean | false | To execute all cache operations in a single thread |

Each cache must have a unique name, which will be automatically normalized to kebab-case (i.e. myEntity becomes my-entity)

A default configuration for a cache can be defined as a bean:

@Factory
@Requires(missingProperty = LocalCacheProperties.PREFIX + "." + MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheConfig {
  public static final String CACHE_NAME = "my-entity";


  @Bean
  @Named(CACHE_NAME)
  LocalCacheProperties cacheProperties(ApplicationConfiguration applicationConfiguration) {
    return LocalCacheProperties.builder()
        .cacheName(CACHE_NAME)
        .applicationConfiguration(applicationConfiguration)
        .build();
  }
}

Weigher

A weigher determines if an element becomes too heavy to be in the cache. If for a cache's maximumWeight there is not a corresponding named weigher, any other weigher will be selected. If there is no registered weigher, a default weigher where every element has a weight of 1 will be used:

import com.github.benmanes.caffeine.cache.*;

@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheWeigher implements Weigher<UUID, MyEntity> {
  @Override
  public @NonNegative int weigh(@NonNull UUID key, @NonNull MyEntity value) {
    return 0;
  }
}

Removal Listener

A listener that triggers every time an element is evicted:

import com.github.benmanes.caffeine.cache.*;

@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheRemovalListener implements RemovalListener<UUID, MyEntity> {
  @Override
  public void onRemoval(@Nullable UUID key, @Nullable MyEntity value, @NonNull RemovalCause cause) {
    // Do something with the event
  }
}

Annotation-based caching

Any class (POJOs, Connections, Factories...) can be stored in the cache. For example, instances of the following entity type:

@Data
public class MyEntity implements CoreEntity<UUID> {
  private UUID id;
  private String name;
  ...
}

can be cached in any managed bean with the use of io.micronaut.cache.annotation annotations:

  • @Cacheable
  • @CachePut
  • @CacheInvalidate
@Singleton
@CacheConfig(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityService {
  
  @Inject
  protected MyEntityRepository myEntityRepository;


  @Cacheable(keyGenerator = CoreEntityKeyGenerator.class)
  public List<MyEntity> getAll() {
    return myEntityRepository.getAll();
  }
  
  
  @Cacheable
  public MyEntity getOne(UUID id) {
    return myEntityRepository.getOne(id);
  }


  @CachePut(keyGenerator = CoreEntityKeyGenerator.class)
  public void store(MyEntity myEntity) {
    myEntityRepository.store(myEntity);
  }


  @CacheInvalidate
  public MyEntity delete(UUID id) {
    return myEntityRepository.delete(id);
  }
}

The key for the cacheable object must implement equals() and hashCode()

Note that for the getAll() and store(MyEntity) methods, a custom key generator needs to be specified. This way the cache will calculate the appropriate key for each entity. If no generator is defined, the DefaultCacheKeyGenerator is used.

The CoreEntityKeyGenerator can be used with any entity that implements CoreEntity<T>

Multiple caches can be configured in the same @CacheConfig, in which case, the name of the used cache must be specified. Likewise, the key for the cached value can be a composite of multiple objects (internally wrapped as a ParametersKey and generated by a DefaultCacheKeyGenerator):

@Singleton
@CacheConfig({ "cacheA", "cacheB" })
public class MyMultiCacheService {
  
  @Cacheable("cacheA")
  public MyEntity getOneA(UUID id) {
    ...
  }
  

  @Cacheable("cacheB")
  public MyEntity getOneB(UUID id, UUID parentId) {
    ...
  }
}

Cache Statistics

If the cache statistics are enabled, they will be published as part of the application metrics:

  • cache.eviction.weight - the sum of weights of evicted entries
  • cache.evictions - the count of cache evictions
  • cache.size - the estimated number of entries in the cache
  • cache.gets - the number of times a cache-annotated method has returned an item (regardless if it was cached or not). This metric can be refined with the use of tags:
    • result:hit - the number of times cache lookup methods have returned a cached value
    • result:miss - the number of times cache lookup methods have returned an uncached value

If the application has multiple caches, the metrics can be filtered with the cache:my-entity tag

Collections

Flushable Collection

A data structure that asynchronously flushes its content any time a preconfigured criteria is met. It is backed up by an ArrayList<T> and it is guaranteed to be thread-safe.

Configuration

Basic Properties

| Property | Type | Default | Description | | ------------ | :------: | :-----: | ----------- | | maxCount | Integer | | The maximum number of elements before flushing. Triggers a Count flush | | maxDataSize | Long | | The maximum size of the collection elements before flushing. Triggers a Data Size flush | | flushAfter | Duration | | The duration before flushing. Triggers a Scheduled flush | | threads | Integer | 5 | The number of threads used to execute the flush event | | flushTimeout | Duration | 10m | The timeout for the flush event |

Properties Template

For a collection of type my-collection, a set of default properties can be defined as a bean or in the application.yml:

collections:
  flushable:
    myCollection:
      maxCount: 10
      maxDataSize: 1mb
      flushAfter: 5m
      threads: 10
      flushTimeout: 10m

The same configuration can be defined as:

@Factory
@Requires(missingProperty = FlushableCollectionProperties.PREFIX + "." + MyFlushableCollectionConfig.TYPE)
public class MyFlushableCollectionConfig {
  public static final String TYPE = "my-collection";


  @Bean
  @Named(TYPE)
  FlushableCollectionProperties collectionProperties() {
    return FlushableCollectionProperties.builder()
        .type(TYPE)
        .maxCount(10)
        .maxDataSize(DataSize.ofMegabytes(1).asBytes())
        .flushAfter(Duration.ofMinutes(5))
        .threads(10)
        .flushTimeout(Duration.ofMinutes(10))
        .build();
  }
}

Each collection definition must have a unique name, which will be automatically normalized to kebab-case (i.e. myCollection becomes my-collection)

Action Handlers

Flush Handler

Consumer to be called when the flush event is triggered. For instance, a flush handler for a collection of integer elements can be defined as:

public class MyCollectionFlushHandler implements Consumer<FlushableCollection.Batch<Integer>> {
  @Override
  public void accept(FlushableCollection.Batch<Integer> batch) {
    // Do something with the batch
  }
}

By default, does nothing: builder.flushHandler(batch -> {})

Weigher

Function to determine the size of an element. Required to trigger a Data Size flush. For instance, a collection of integer elements can define its weigher as:

public class MyCollectionWeigher implements Function<Integer, Long> {
  @Override
  public Long apply(Integer element) {
    return element.toString().length();
  }
}

By default, the weight is calculated after converting the element to String and counting its number of bytes using UTF-8 encoding.

Success Handler

Consumer to be executed if the flush handler was successfully executed. For instance, a success handler for a collection of integer elements can be defined as:

public class MyCollectionSuccessHandler implements Consumer<FlushableCollection.Batch<Integer>> {
  @Override
  public void accept(FlushableCollection.Batch<Integer> batch) {
    // Do something with the successful batch
  }
}

By default, logs a debug message with the details of the processed batch: builder.successHandler(batch -> log.debug(...))

Failure Handler

BiConsumer to be executed if the flush handler failed. For instance, a failure handler for a collection of integer elements can be defined as:

public class MyCollectionFailureHandler implements Consumer<FlushableCollection.Batch<Integer>, Throwable> {
  @Override
  public void accept(FlushableCollection.Batch<Integer> batch, Throwable ex) {
    // Do something with the failed batch
  }
}

By default, logs an error message with the details of the batch and its exception: builder.failureHandler((batch, ex) -> log.error(...)). If the flush event causes a timeout, the input Throwable will be of type java.util.concurrent.TimeoutException.

Usage

The collection must be created through the FlushableCollectionFactory bean, by providing the expected type. If the application context finds a FlushableCollectionProperties with the same name, it will be used as template for the new collection. Otherwise, a new properties set with default values will be created. Note that any pre-defined property can be overridden during the build phase.

@Inject
protected FlushableCollectionFactory flushableCollectionFactory;

void submit() {
  try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
        .flushHandler(new MyCollectionFlushHandler())
        .weigher(new MyCollectionWeigher())
        .successHandler(new MyCollectionSuccessHandler())
        .failureHandler(new MyCollectionFailureHandler())
        .build()) {
    
    for (int i = 0; i < 10; i++) {
      collection.add(i);
    }
  }
}

Flush Events

  • COUNT - Triggers if the collection contains more elements than the value defined in the maxCount property. If undefined, it will never be triggered.
  • DATA_SIZE - Triggers if the size of the elements in the collection is greater than the value defined in the maxDataSize property. If undefined, it will never be triggered.
  • SCHEDULED - Triggers based on the schedule defined with the flushAfter property. If undefined, it will never be triggered.
  • MANUAL - Triggers whenever the collection.flush() method is called.
  • CLOSE - Triggers whenever the collection is closed by either using a try with resources, or by calling the collection.close() method.

Flush Metrics

Each collection will publish metrics about the duration of each flush event, its size, and the count of success/failures (see Metrics section).

This can be refined with the use of custom tags during the creation of the collection:

try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
        .tag("key", "value")
        .build()) {
  // Do something with the collection
}

The metrics will then be available in:

  • GET /metrics/pdp.collections.flushable.[type] - The count of successful and failed flush events.
  • GET /metrics/pdp.collections.flushable.[type].duration - The duration for the flush handler.
  • GET /metrics/pdp.collections.flushable.[type].size - The size of the flushed elements.

DSL

The PDP DSL is an abstract definition for a common language to be used in any PDP product. Its intention is to have a standardized way to express configurations that might be interpreted in a different way according to the needs of the product itself.

Filters

A filter is a criteria to be applied to a given object. In order to use it, the FilterAdapter interface needs to be implemented. The core supports the following concrete adapters:

  • MapFilterAdapter - Converts the filter into a predicate used to evaluate Map<String, Object> structures. The expected field name is the key of the map.
  • JsonPathFilterAdapter - Converts the filter into a predicate used to evaluate DocumentContext structures. The expected field name is a JSON Path to be found in the JSON document.
  • JsonPointerFilterAdapter - Converts the filter into a predicate used to evaluate ObjectNode structures. The expected field name is a JSON Pointer to be found in the JSON document.

All filters have an optional source field that could be used by the concrete implementation to select among multiple data structures

"Equals" Filter

The value of the field must be exactly as the one provided.

var filter = EqualsFilter.builder().field("field").value("value").build();
{
  "equals": {
    "field": "field",
    "value": "value"
  }
}

"Greater Than" Filter

The value of the field must be greater than the one provided.

var filter = GreaterThanFilter.builder().field("field").value(1).build();
{
  "gt": {
    "field": "field",
    "value": 1
  }
}

"Greater Than or Equals" Filter

The value of the field must be greater than or equals to the one provided.

var filter = GreaterThanOrEqualsFilter.builder().field("field").value(1).build();
{
  "gte": {
    "field": "field",
    "value": 1
  }
}

"Less Than" Filter

The value of the field must be less than the one provided.

var filter = LessThanFilter.builder().field("field").value(1).build();
{
  "lt": {
    "field": "field",
    "value": 1
  }
}

"Less Than or Equals" Filter

The value of the field must be less than or equals to the one provided.

var filter = LessThanOrEqualsFilter.builder().field("field").value(1).build();
{
  "lte": {
    "field": "field",
    "value": 1
  }
}

"In" Filter

The value of the field must be one of the provided values.

var filter = InFilter.builder().field("field").value("valueA").value("valueB").build();
{
  "in": {
    "field": "field",
    "values": [
      "valueA",
      "valueB"
    ]
  }
}

"Empty" Filter

Checks if a field is empty:

  • For a collection, true if its size is 0
  • For a String, true if its length is 0
  • For any other type, true if it is null
var filter = EmptyFilter.builder()
        .field("field")
        .build();
{
  "empty": {
    "field": "field"
  }
}

"Exists" Filter

Checks if a field exists.

var filter = ExistsFilter.builder()
        .field("field")
        .build();
{
  "exists": {
    "field": "field"
  }
}

"Not" Filter

Negates the inner clause.

var filter = NotFilter.builder()
        .clause(EqualsFilter.builder().field("field").value("value").build())
        .build();
{
  "not": {
    "equals": {
      "field": "field",
      "value": "value"
    }
  }
}

"Null" Filter

Checks if a field is null. Note that while the "exists" filter checks whether the field is present or not, the "null" filter expects the field to be present but with null value.

var filter = NullFilter.builder()
        .field("field")
        .build();
{
  "null": {
    "field": "field"
  }
}

Boolean Operators

"And" Filter

All conditions in the list must be evaluated to true.

var filter = AndFilter.builder()
    .clause(EqualsFilter.builder().field("fieldA").value("valueA").build())
    .clause(EqualsFilter.builder().field("fieldB").value("valueB").build())
    .build();
{
  "and": [
    {
      "equals": {
        "field": "fieldA",
        "value": "valueA"
      }
    }, {
      "equals": {
        "field": "fieldB",
        "value": "valueB"
      }
    }
  ]
}

"Or" Filter

At least one condition in the list must be evaluated to true.

var filter = OrFilter.builder()
    .clause(EqualsFilter.builder().field("fieldA").value("valueA").build())
    .clause(EqualsFilter.builder().field("fieldB").value("valueB").build())
    .build();
{
  "or": [
    {
      "equals": {
        "field": "fieldA",
        "value": "valueA"
      }
    }, {
      "equals": {
        "field": "fieldB",
        "value": "valueB"
      }
    }
  ]
}

HTTP Base Client

The HTTP Base Client is an abstraction of the OkHttpClient. It provides features such as retry handlers and automated paginated requests.

The minimum initialization for the client defines the configuration of the HTTP connections and other shared properties to be used by the endpoints:

public class MyClient extends HttpBaseClient {
  protected MyClient(
      @NonNull OkHttpClient client,
      @NonNull ObjectMapper objectMapper,
      @NonNull BackoffPolicyProperties backoffPolicy,
      MeterRegistry meterRegistry,
      @NonNull ScrollProperties scroll)
  {
    super(client, objectMapper, backoffPolicy, scroll);
  }

  
  @Override
  public boolean ping() {
    return true;
  }


  public static Builder builder() {
    return new Builder();
  }
  

  public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
    @Override
    public MyClient build() {
      return new MyClient(
          newClientBuilder().build(),
          getObjectMapper(),
          getBackoffPolicy(),
          getScroll());
    }
  }
}

The builder can be also initialized with a HttpBaseClientProperties instance:

public class MyClient extends HttpBaseClient {
  ...

  public static Builder builder(HttpBaseClientProperties clientProperties) {
    return new Builder(clientProperties);
  }
  
  @NoArgsConstructor
  public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
    protected Builder(HttpBaseClientProperties clientProperties) {
      super(clientProperties);
    }
    
    ...
  }
}

If some configurations are not desired on the implemented client, they can be disabled in the builder:

public class MyClient extends HttpBaseClient {
  ...

  @HttpBaseClientBuilder.DisabledConfig({
      HttpBaseClientBuilder.Config.COMPRESS_REQUESTS,
      HttpBaseClientBuilder.Config.FOLLOW_REDIRECTS,
      HttpBaseClientBuilder.Config.SCROLL
  })
  public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
    ...
  }
}

any attempt to set a disabled configuration will throw an UnsupportedOperationException.

By default, the retry of the executeWithBackoff(...) method will happen if the HTTP response code is:

  • 408 - Request Time Out
  • 429 - Too Many Request
  • 500 - Internal Server Error
  • 504 - Gateway Timeout

The condition can be changed by sending a different predicate to the method:

client.executeWithBackoff(requestDetails, response -> false);

In this case, the request will not be retried because of a status code (although it might be retried because of an exception during the execution).

HTTP Round Robin Client

The HTTP Round Robin Client is a specialization of the HttpBaseClient backed up by a round-robin collection. Provides the corresponding classes as described in the previous section: HttpRoundRobinClient, HttpRoundRobinClientBuilder, and HttpRoundRobinClientProperties.

HTTP Client Request

The request object is a simple POJO with the properties needed to execute the HTTP call:

@RequiredArgsConstructor
public class MyRequest implements HttpClientRequest {
  private final String id;
}

For cases when no data is needed in the request, an empty implementation is avaliable in HttpRoundRobinClient#EMPTY_REQUEST

HTTP Client Pageable Request

The HttpClientPageableRequest interface is an extension of HttpClientRequest and defines default methods to handle pagination parameters.

HTTP Client Response

Single Response

Expected when the response is a single payload.

public class MySinglePayloadResponse extends AbstractSingleResponse<MyRequest> {
  public MySinglePayloadResponse(
      ObjectMapper objectMapper,
      HttpClientRequestDetails<MyRequest> requestDetails,
      Response httpResponse
  ) {
    super(objectMapper, requestDetails, httpResponse);
  }


  @Override
  protected void handleSuccess(Response httpResponse) {
    // Do something with the response
  }
}

With this definition, the endpoint can be exposed in the client:

public class MyClient extends HttpRoundRobinClient {
  ...
  public MySinglePayloadResponse get(MyRequest clientRequest) {
    var url = serverHosts.next()
        .newBuilder()
        .addPathSegment("content")
        .addPathSegment(request.getId())
        .build();

    var requestDetails = new HttpClientRequestDetails<>(
        new Request.Builder().url(url).get().build(),
        clientRequest);

    return new MySinglePayloadResponse(
        objectMapper,
        requestDetails,
        executeWithBackoff(requestDetails)
    );
  }
  ...
}

Both handleSuccess(Response) and handleFailure(Response) have a default implementation that can be overridden. A response is considered a success if its HTTP status code is in the 200 to 299 range and its body is not null (not to be confused with an empty body). This check can be changed in the constructor:

public class MySinglePayloadResponse extends AbstractSingleResponse<MyRequest> {
  public MySinglePayloadResponse(
      ObjectMapper objectMapper,
      HttpClientRequestDetails<MyRequest> requestDetails,
      Response httpResponse
  ) {
    super(objectMapper, requestDetails, httpResponse, r -> r.code() == 400);
  }
}

If the response was a failure, but it had a payload, it can be retrieved with AbstractSingleResponse#getErrorMessage(), or AbstractSingleResponse#getErrorMessage(Class) (to convert the payload into the given class type).

Iterable Response

Expected when the response is a full collection of iterable elements with no pagination. It is assumed that the payload of the response is a JSON array (if that's not the case, the handleSuccess(Response httpResponse) method must be overridden).

public class MyIterableResponse extends AbstractIterableResponse<MyRequest, String> {
  public MyIterableResponse(
      ObjectMapper objectMapper,
      HttpClientRequestDetails<MyRequest> requestDetails,
      Response httpResponse
  ) {
    super(objectMapper, requestDetails, httpResponse);
  }


  @Override
  protected String parseElement(JsonNode jsonNode) {
    return jsonNode.asText();
  }
}

With this definition, the endpoint can be exposed in the client:

public class MyClient extends HttpRoundRobinClient {
  ...
  public MyIterableResponse list(MyRequest clientRequest) {
    var url = serverHosts.next()
        .newBuilder()
        .addPathSegment("list")
        .addPathSegment(request.getId())
        .build();

    var requestDetails = new HttpClientRequestDetails<>(
        new Request.Builder().url(url).get().build(),
        clientRequest);

    return new MyIterableResponse(
        objectMapper,
        requestDetails,
        executeWithBackoff(requestDetails)
    );
  }
  ...
}

Each element in the response can then be retrieved by using the Iterable<> interface.

The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.

Paginated Response

Token Page

Expected when the response is a consecutive list of pages obtained with a token.

Token Page Response

The response corresponds to each page from the pagination requests. Since it is not possible to automatically determine where is the token for the next page, the AbstractTokenPageResponse#handleSuccess(Response) method must be implemented.

public class MyTokenPageResponse extends AbstractTokenPageResponse<MyRequest, String> {
  public MyTokenPageResponse(
      ObjectMapper objectMapper,
      HttpClientRequestDetails<MyRequest> requestDetails,
      Response httpResponse,
      String currentToken
  ) {
    super(objectMapper, requestDetails, httpResponse, currentToken);
  }


  @Override
  protected void handleSuccess(Response response) {
    try {
      var json = objectMapper.readTree(Objects.requireNonNull(response.body()).byteStream());
      this.nextPageReference = json.get("token").asText();

      if (json.has("content")) {
        this.elements = new JsonLazyIterator<>(json.get("content"), this::parseElement);
      } else {
        this.elements = Collections.emptyIterator();
      }
    } catch (Exception ex) {
      throw new DataException(ex.getMessage(), ex);
    }
  }


  @Override
  protected String parseElement(JsonNode jsonNode) {
    return jsonNode.asText();
  }
}

The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.

Token Page Collection

The response corresponds to a collection of all the page responses that iterates through the data.

public class MyTokenPageCollectionResponse extends AbstractTokenCollectionResponse<MyRequest, String, MyTokenPageResponse> {
  public MyTokenPageCollectionResponse(Function<String, MyTokenPageResponse> pageFunction) {
    super(pageFunction);
  }
}

With this definition, the endpoint can be exposed in the client:

public class MyClient extends HttpRoundRobinClient {
  ...
  public MyTokenPageCollectionResponse paginatedByToken(MyRequest clientRequest) {
    return new MyTokenPageCollectionResponse(
        token -> {
          var urlBuilder = serverHosts.next()
              .newBuilder()
              .addPathSegment("list")
              .addPathSegment(request.getId())
              .addQueryParameter("limit", String.valueOf(scroll.getSize()));

          if (token != null) {
            urlBuilder.addQueryParameter("token", token);
          }

          var requestDetails = new HttpClientRequestDetails<>(
              new Request.Builder().url(urlBuilder.build()).get().build(),
              clientRequest);

          return new MyTokenPageResponse(
              objectMapper,
              requestDetails,
              executeWithBackoff(requestDetails),
              token
          );
        }
    );
  }
  ...
}

Note that in case of a null token, the function must return the first page.

Each element in the response can then be retrieved by using the Iterable<> interface. The pagination will be handled behind the scenes.

Offset Page

Expected when the response is a consecutive list of pages obtained with a page number.

Offset Page Response

The response corresponds to each page from the pagination requests. It is assumed that the payload of the response is a JSON array (if that's not the case, the handleSuccess(Response httpResponse) method must be overridden).

It is also assumed that the AbstractOffsetPageResponse#getNextPageReference() is always 1 after the current page. This method can be overridden for custom behaviors.

public class MyOffsetPageResponse extends AbstractOffsetPageResponse<MyRequest, String> {
  public MyOffsetPageResponse(
      ObjectMapper objectMapper,
      HttpClientRequestDetails<MyRequest> requestDetails,
      Response httpResponse,
      Long currentOffset
  ) {
    super(objectMapper, requestDetails, httpResponse, currentOffset);
  }


  @Override
  protected String parseElement(JsonNode jsonNode) {
    return jsonNode.asText();
  }
}

The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.

Pageable Page Response

The response is a specialization of the Offset Page Response, and it is based on a default page structure:

{
  "totalPages": 0,
  "totalElements": 0,
  "size": 0,
  "content": [
    {}
  ],
  "number": 0,
  "first": true,
  "last": true,
  "sort": {
    "sorted": true,
    "unsorted": true,
    "empty": true
  },
  "numberOfElements": 0,
  "pageable": {
    "page": 0,
    "size": 0,
    "sort": [
      "string"
    ]
  },
  "empty": true
}
Offset Page Collection

The response corresponds to a collection of all the page responses that iterates through the data.

public class MyOffsetPageCollectionResponse extends AbstractOffsetCollectionResponse<MyRequest, String, MyOffsetPageResponse> {
  public MyOffsetPageCollectionResponse(Function<String, MyOffsetPageResponse> pageFunction) {
    super(pageFunction);
  }
}

With this definition, the endpoint can be exposed in the client:

public class MyClient extends HttpRoundRobinClient {
  ...
  public MyOffsetPageCollectionResponse paginatedByOffset(MyRequest clientRequest) {
    return new MyOffsetPageCollectionResponse(
        offset -> {
          var urlBuilder = serverHosts.next()
              .newBuilder()
              .addPathSegment("list")
              .addPathSegment(request.getId())
              .addQueryParameter("limit", String.valueOf(scroll.getSize()));

          if (token != null) {
            urlBuilder.addQueryParameter("page", String.valueOf(offset));
          }

          var requestDetails = new HttpClientRequestDetails<>(
              new Request.Builder().url(urlBuilder.build()).get().build(),
              clientRequest);

          return new MyOffsetPageResponse(
              objectMapper,
              requestDetails,
              executeWithBackoff(requestDetails),
              offset
          );
        }
    );
  }
  ...
}

Note that in case of a null token, the function must return the first page.

Each element in the response can then be retrieved by using the Iterable<> interface. The pagination will be handled behind the scenes.

JSON

JSON handling is done with Jackson Project, and the default configuration for the mapper can be instantiated with com.pureinsights.pdp.core.config.CoreSerializer.newInstance().

Default Object Mapper

Registered Modules

Registered Deserializers

  • com.pureinsights.pdp.core.time.CoreDurationDeserializer as an extension of the default DurationDeserializer to allow values in human-readable format (i.e. "1s", "12m", "5h"...). If no suffix is provided, milliseconds are assumed. The supported suffixes are:
    • d - days
    • h - hours
    • m - minutes
    • s - seconds
    • ms - milliseconds
    • ns - nanoseconds

Enabled Features

  • MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS
  • MapperFeature.ACCEPT_CASE_INSENSITIVE_VALUES
  • DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY

Disabled Features

  • SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS
  • SerializationFeature.WRITE_DATES_AS_TIMESTAMPS
  • DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
  • DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE

Logging Handler

The logging handler simplifies the process of log analysis by using a JSON output with support for custom POJOs.

Logback Configuration

<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder class="net.logstash.logback.encoder.LogstashEncoder">
      <provider class="com.pureinsights.pdp.core.logging.CoreArgumentsProvider"/>
    </encoder>
  </appender>

  <root level="info">
    <appender-ref ref="STDOUT"/>
  </root>
</configuration>

Custom POJO

@Getter
public class MyPOJO {
  private final UUID id = UUID.randomUUID();
}
public class MyPOJOLoggingHandler implements PojoLoggingHandler<MyPOJO> {
  private static final String PARENT_LABEL = "myPOJO";
  private static final String ID_LABEL = "id";


  @Override
  public Class<MyPOJO> getSupportedType() {
    return MyPOJO.class;
  }


  @Override
  public void write(JsonGenerator generator, MyPOJO pojo) throws IOException {
    generator.writeFieldName(PARENT_LABEL);
    generator.writeStartObject();
    generator.writeObjectField(ID_LABEL, pojo.getId());
    generator.writeEndObject();
  }
}

In order to invoke the handler, the class type can call the logging framework:

log.info("Message", new MyPOJO());
log.info("Message {}", new MyPOJO());
log.info("Message {}", "with placeholder", new MyPOJO());
{"@timestamp":"2021-12-08T15:07:11.379Z","@version":1,"message":"Message","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"707ea126-d94b-47b6-9010-72a3acc8a786"}}
{"@timestamp":"2021-12-08T15:07:11.380Z","@version":1,"message":"Message MyPOJO$373141eb","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"398bb1df-29b3-4738-afc6-a7b9e892ce19"}}
{"@timestamp":"2021-12-08T15:07:11.381Z","@version":1,"message":"Message with placeholder","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"00ba3228-f434-4ed0-b45e-62ec9163aac4"}}

Note that the custom POJO is always displayed as expected. The example also demonstrates that the POJO must not have a corresponding placeholder in the message as it will be automatically resolved by the provider. Exceptions must be the in the last position of the input:

...
} catch (Exception ex) {
  log.error(ex.getMessage(), new MyPOJO(), ex);
}

If the logging handler is part of the PDP Core Common Libraries project, it must be registered in the static section of the com.pureinsights.pdp.core.logging.CoreArgumentsProvider class.

Any other logging handler under an application context, must be declared as a bean (i.e. @Singleton).

If none of the above is applicable, the handler must be registered using the CoreArgumentsProvider#registerHandler(PojoLoggingHandler) method.

Metrics

The metrics of the application are collected and exposed by Micrometer. They are enabled by default in the application.yml and exposed through the GET /metrics and GET /metrics/[name] endpoints:

micronaut:
  metrics:
    enabled: true
{
  "names": [
    "http.server.requests",
    "jvm.buffer.count",
    "jvm.buffer.memory.used",
    ...,
    "pdp.metric.count"
  ]
}

When available, the metrics can be filtered with the use of tags: GET /metrics/pdp.metric.count?tag=source:component&tag=type:value1

By default, all metrics include the component tag with the name of the application

Annotation-based metrics

Any method in any bean can be measured with:

  • @Timed
  • @Counted
@Singleton
public class MyService {
  
  @Timed(value = "pdp.myservice.execute", histogram = true)
  public void execute(UUID id) {
    ...
  }


  @Counted("pdp.myservice.validate")
  public void validate(UUID id) {
    ...
  }
}

The name of the metric must be in lowercase, using dot notation

Custom metrics

Instead of using annotations, the metric registry can be injected and manually handled in any bean, with any type of meter:

@Singleton
public class MyCustomService {
  private Counter counter;
  
  
  @PostConstruct
  void init(MeterRegistry meterRegistry) {
    counter = Counter.builder("pdp.mycustomservice.count")
        .tag("key", "value")
        .register(meterRegistry);
  }
  
  
  public void count() {
    counter.increment();
  }
}

Retries

RetryUtils, as its name may suggest, provide utils or methods to handle the retry of a Runnable or Supplier. It also handles the exceptions might occur during the run.

By default, the RetryUtils will use a ConstantDelayRetry, wich will retry 3 times with a delay of 5 seconds.

...
AtomicInteger atomicInteger = new AtomicInteger();
RetryUtils.retry(atomicInteger::getAndIncrement)

You can also specify a custom retry policy passing it through parameters.

...
AtomicInteger atomicInteger = new AtomicInteger();
RetryUtils.retry(atomicInteger::getAndIncrement, new ConstantDelayRetry(2, 100))

In case of any failure during the retry an exception will be thrown and the RetryUtils will handle it. When the number of retries reaches the specified in the retry policy a CoreException will be thrown.

Scheduler

Scheduled tasks are handled by Quartz. Any application can inject the org.quartz.Scheduler bean to register/unregister its own cron jobs.

Any execution of a org.quartz.Job implementation will automatically perform the dependency injection of its visible fields without the need of declaring the class as a singleton.

Message Queue Provider

To handle communication between the different components, PDP uses a message queue. The following is a quick overview of the available configuration fields, examples on how to use the Message Queue classes and the current implementations available with configuration examples.

Configuration

Basic Configuration

| Property | Type | Default | Description | |---------------------------|:------:|:-------:|----------------------------------------------------------------------------------------------| | messageQueue.provider | String | | The name of the MessageQueue provider to use. (e.g. rabbitmq to load the RabbitMQ provider). |

Using the MessageQueueProvider

The MessageQueueProvider instance can be injected as follows:

public class MyClass {

  @Inject
  MessageQueueProvider mqProvider;
  ...

Messages

Message Properties

| Property | Type | Default | Description | |---------------|:-------------------:|:-------:|-------------------------------------------------------------------------------------------| | body | byte[] | | The body of the message. | | queue | String | | The queue this message was sent to or delivered from. | | type | Message.MessageType | | The type of the message. Can be DIRECT or BROADCAST. See Message Types. | | properties | Map<String,String> | | Map of message properties. |

Message Types

Direct Messages

Direct messages are sent to a given queue and are meant to be consumed by a single consumer. The message is persisted in the queue until a consumer is available.

Broadcast Messages

Broadcast messages are sent to a given queue and are meant to be consumed by all consumers available at the moment of sending. The message is not persisted if no consumers are available.

Creating a new Message

The Message class provides a builder to simplify the message creation process:

Message newMessage = Message.builder()
        .body("My message")
        .type(Message.MessageType.DIRECT)
        .queue("queue")
        .property("key", "value")
        .build();

Some considerations with the Message builder:

  • There are multiple ways to set the message body. If more than one is set, only the last one will be used.
  • Every time the property method is called, the given property is appended to the map.
  • Calling properties method will overwrite any previously set properties.

Consumers

Configuration

| Property | Type | Default | Description | |---------------------------------------|:-------:|:-------:|-------------------------------------------------------------------------------| | consumer.{name}.maxMessages | Integer | 5 | Maximum number of messages to consume in parallel. | | consumer.{name}.maxMessageRequeues | Integer | 5 | Maximum number of times a message should be sent back to the queue for retry. |

Example

messageQueue:
  consumer:
      directConsumer:
        maxMessages: 4
        maxMessageRequeues: 6

This configuration can be loaded as follows

protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
  try {
    var maxRequeues = consumerProperties.getMaxMessageRequeues();
    ...
  }
}

Registering Consumers

To register a consumer do the following:

public class MyClass {

  @Inject
  MessageQueueProvider mqProvider;
  ...
  
  protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
    try {
      mqProvider.registerConsumer(queue, consumerProperties, Message.MessageType.DIRECT, this::consumeMessage);    
    ...
    }
  }

The registerConsumer method takes the following parameters:

| Parameter | Type | Description | |--------------------|:--------------------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| | queue | String | Name of the queue to register the Consumer to. | | consumerProperties | ConsumerProperties | A ConsumerProperties instance for the consumer. | | messageType | Message.MessageType | The type of message this consumer is supposed to listen to. Can be Message.MessageType.DIRECT or Message.MessageType.BROADCAST. See Message Types. | | onMessageDelivery | Predicate<Message> | The function to execute when a message is received. Returns true if the message is successfully consumed, false otherwise. |

Producers

Configuration

| Property | Type | Default | Description | |-------------------------------|:--------:|:-------:|--------------------------------------------------------------------------------------------------| | producer.threads | Integer | 5 | Number of threads available for producers to send asyncMessages. | | producer.sendMessageTimeout | Duration | 30s | Maximum time to wait to get a confirmation that a message has been successfully sent. | | producer.retry.retries | Integer | 5 | Maximum number of times to try to send a message. | | producer.retry.delay | Duration | 1s | Time to wait before retrying. The time is multiplied by the number of retries to backoff between executions. |

Example

messageQueue:
  producer:
      sendMessageTimeout: 10000
      threads: 10
      retry:
        retries: 10
        delay: 10000

This configuration can be loaded by injecting the ProducerProperties instance:

public class MyClass {

  @Inject
  ProducerProperties producerProperties;
  ...

Getting a Producer instance

To get a new producer instance do the following:

public class MyClass {

  @Inject
  MessageQueueProvider mqProvider;
  ...
  
  protected void myMethod() {
    try {
      var myProducer = mqProvider.getProducer();    
    ...
    }
  }

Sending a message

Async Messages

producer.asyncSend(newMessage,
        (message) -> messageSuccessHandler(message),
        (message, exception) -> messageFailureHandler(message, exception));

The asyncSend method takes the following parameters:

| Parameter | Type | Description | |----------------|:------------------------------:|------------------------------------------------------------------------------------------------------------------| | message | Message | The Message to send. | | successHandler | Consumer<Message> | A consumer function to execute when the message is succesfully sent. | | failureHandler | BiConsumer<Message, Exception> | A biconsumer function to execute when there is an error sending the message and the retries have been exhausted. |

Sync Messages

producer.syncSend(newMessage);

The syncSend method takes the following parameters:

| Parameter | Type | Description | |----------------|:------------------------------:|-----------------------------------| | message | Message | The Message to send. |

Providers

RabbitMQ

To enable the RabbitMQ provider, set the messageQueue.provider property with rabbitmq as value.

Configuration

| Property | Type | Default | Description | |------------------------|:-------:|:------------:|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | host | String | localhost | The RabbitMQ host to connect to. | | port | Integer | 5672 | The port to connect to. | | user | String | | The user in case authentication is required | | password | String | | The password for the user | | virtualHost | String | / | The RabbitMQ virtual host to connect to. | | uri | String | | The complete RabbitMQ url. For example [amqps://user:pass@localhost:5671/%2f]. Setting this property will overwrite host, port, user, password and virtualHost. | | exchangeName | String | pdp.exchange | The name of the exchange to send the messages to. | | tls.enabled | Boolean | false | Whether TLS is enabled or not on RabbitMQ. This assumes the required certificates come from a known CA or have already been registered into the JVM cacerts keystore. | | tls.tlsVersion | String | TLSv1.2 | TLS version being used. | | tls.p12KeyPath | String | | Path to the P12 key. | | tls.p12KeyPassphrase | String | | Passphrase for the P12 key. |

####RabbitMQ Producer Configuration The following are the RabbitMQ specific configuration properties for a producer.

| Property | Type | Default | Description | |--------------------|:-------:|:-------:|--------------------------------------------------------------------------------------------------------| | minIdleChannels | Integer | 5 | The minimum number of idle channels to leave on the channel pool. | | maxIdleChannels | Integer | 10 | The maximum number of idle channels to leave on the channel pool. | | maxOpenChannels | Integer | 20 | The maximum number of open channels to have at the same time. This includes idle and in use channels. |

Configuration Example

messageQueue:
  provider: rabbitmq
  uri: amqps://user:pass@localhost:5671/%2f
  exchangeName: testExchange
  tls:
    enabled: true
    p12KeyPath: C:/certs/rmq.p12
    p12KeyPassphrase: rmqpass
  producer:
    minIdleChannels: 2
    maxIdleChannels: 4
    maxOpenChannels: 6
    sendMessageTimeout: 10000
    threads: 10
    retry:
      retries: 10
      delay: 10000
  consumer:
    directConsumer:
      maxMessages: 4
      mmaxMessageRequeues: 6
    broadcastConsumer:
      maxMessages: 3
      maxMessageRequeues: 6
  • No labels