...
Uses the concept of sessions to buckets in order to give a logical separation between different scopes. For example, "config" could be a bucket to store all data related to the configurations scope, and it will be represented as a folder in the local file system provider.
@Inject
protected BinaryDataService binaryDataService;
protected BinaryDataServiceSession session;
@PostConstruct
void init() {
session = binaryDataService.newSession("config");
}
public void storeConfig(String key, InputStream inputStream) {
session.store(key, inputStream);
}
public Optional<InputStream> findConfig(String key) {
return session.find(key);
}
Providers
Local File System
...
storage:
binary:
provider: local
dataPath: other
Configuration parameters
...
storage.binary.provider
(Optional, String) If given, must be local
.
...
Keep in mind that SFTP is known to be slow. Use it only as last resource
Configuration parameters
...
storage.binary.provider
(Required, String) Must be sftp
.
...
(Optional, Duration) The timeout for the reading from the server. Default is 3s
.
Cache Manager
The local cache is an extension to the one provided by Micronaut and implemented with Caffeine. It simplifies the process of caching entities with the use of annotations and configurations in the application.yml
...
Amazon S3
Uses an Amazon S3 bucket as storage:
storage:
cachebinary:
localprovider: s3
myEntitybucket: bucket
region: us-east-1
initialCapacity: 10 dataPath: ingestion
maximumSizecredentials:
50 maximumWeightaccessKey: 100access-key
secretKey: expireAfterWrite: 1h
secret-key
connection:
connectTimeout: 30000
socketTimeout: 30000
expireAfterAccessmaxConnections: 5m 20
awsThrottlingProperties:
recordStatsenabled: true
retriesBeforeThrottling: 3
testModemaxRetries: false5
For a cache named myEntity
with the following properties:
| Property | Type | Default | Description | | ----------------- | :------: | :-----: | ----------- | | initialCapacity | Integer | 16 | The minimum size of the cache | | maximumSize | Long | | The maximum size of the cache. Can not be combined with a weigher | | maximumWeight | Long | | The maximum weight to be allowed for an element in the cache (see Weigher section) | | expireAfterWrite | Duration | | The time to wait to expire an element after its creation | | expireAfterAccess | Duration | 5m | The time to wait to expire an element after the last time it was accessed | | recordStats | boolean | true | To record statistics about hit rates and evictions (see Cache Statistics section) | | testMode | boolean | false | To execute all cache operations in a single thread |
Each cache must have a unique name, which will be automatically normalized to kebab-case (i.e.
myEntity
becomesmy-entity
)
A default configuration for a cache can be defined as a bean:
@Factory
@Requires(missingProperty = LocalCacheProperties.PREFIX + "." + MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheConfig {
public static final String CACHE_NAME = "my-entity";
@Bean
@Named(CACHE_NAME)
LocalCacheProperties cacheProperties(ApplicationConfiguration applicationConfiguration) {
return LocalCacheProperties.builder()
.cacheName(CACHE_NAME)
.applicationConfiguration(applicationConfiguration)
.build();
}
}
Weigher
A weigher determines if an element becomes too heavy to be in the cache. If for a cache's maximumWeight
there is not a corresponding named weigher, any other weigher will be selected. If there is no registered weigher, a default weigher where every element has a weight of 1 will be used:
import com.github.benmanes.caffeine.cache.*;
@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheWeigher implements Weigher<UUID, MyEntity> {
@Override
public @NonNegative int weigh(@NonNull UUID key, @NonNull MyEntity value) {
return 0;
}
}
Removal Listener
A listener that triggers every time an element is evicted:
import com.github.benmanes.caffeine.cache.*;
@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheRemovalListener implements RemovalListener<UUID, MyEntity> {
@Override
public void onRemoval(@Nullable UUID key, @Nullable MyEntity value, @NonNull RemovalCause cause) {
// Do something with the event
}
}
Annotation-based caching
Any class (POJOs, Connections, Factories...) can be stored in the cache. For example, instances of the following entity type:
@Data
public class MyEntity implements CoreEntity<UUID> {
private UUID id;
private String name;
...
}
can be cached in any managed bean with the use of io.micronaut.cache.annotation
annotations:
@Cacheable
@CachePut
@CacheInvalidate
@Singleton
@CacheConfig(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityService {
@Inject
protected MyEntityRepository myEntityRepository;
@Cacheable(keyGenerator = CoreEntityKeyGenerator.class)
public List<MyEntity> getAll() {
return myEntityRepository.getAll();
}
@Cacheable
public MyEntity getOne(UUID id) {
return myEntityRepository.getOne(id);
}
@CachePut(keyGenerator = CoreEntityKeyGenerator.class)
public void store(MyEntity myEntity) {
myEntityRepository.store(myEntity);
}
@CacheInvalidate
public MyEntity delete(UUID id) {
return myEntityRepository.delete(id);
}
}
The key for the cacheable object must implement
equals()
andhashCode()
Note that for the getAll()
and store(MyEntity)
methods, a custom key generator needs to be specified. This way the cache will calculate the appropriate key
for each entity. If no generator is defined, the DefaultCacheKeyGenerator
is used.
The
CoreEntityKeyGenerator
can be used with any entity that implementsCoreEntity<T>
Multiple caches can be configured in the same @CacheConfig
, in which case, the name of the used cache must be specified. Likewise, the key for the cached value can be a composite of multiple objects (internally wrapped as a ParametersKey
and generated by a DefaultCacheKeyGenerator
):
@Singleton
@CacheConfig({ "cacheA", "cacheB" })
public class MyMultiCacheService {
@Cacheable("cacheA")
public MyEntity getOneA(UUID id) {
...
}
@Cacheable("cacheB")
public MyEntity getOneB(UUID id, UUID parentId) {
...
}
}
Cache Statistics
If the cache statistics are enabled, they will be published as part of the application metrics:
cache.eviction.weight
- the sum of weights of evicted entriescache.evictions
- the count of cache evictionscache.size
- the estimated number of entries in the cachecache.gets
- the number of times a cache-annotated method has returned an item (regardless if it was cached or not). This metric can be refined with the use of tags:result:hit
- the number of times cache lookup methods have returned a cached valueresult:miss
- the number of times cache lookup methods have returned an uncached value
If the application has multiple caches, the metrics can be filtered with the
cache:my-entity
tag
Collections
Flushable Collection
A data structure that asynchronously flushes its content any time a preconfigured criteria is met. It is backed up by an ArrayList<T>
and it is guaranteed to be thread-safe.
Configuration
Basic Properties
| Property | Type | Default | Description | | ------------ | :------: | :-----: | ----------- | | maxCount | Integer | | The maximum number of elements before flushing. Triggers a Count flush | | maxDataSize | Long | | The maximum size of the collection elements before flushing. Triggers a Data Size flush | | flushAfter | Duration | | The duration before flushing. Triggers a Scheduled flush | | threads | Integer | 5 | The number of threads used to execute the flush event | | flushTimeout | Duration | 10m | The timeout for the flush event |
Properties Template
For a collection of type my-collection
, a set of default properties can be defined as a bean or in the application.yml
:
collections:
flushable:
myCollection:
maxCount: 10
maxDataSize: 1mb
flushAfter: 5m
threads: 10
flushTimeout: 10m
The same configuration can be defined as:
@Factory
@Requires(missingProperty = FlushableCollectionProperties.PREFIX + "." + MyFlushableCollectionConfig.TYPE)
public class MyFlushableCollectionConfig {
public static final String TYPE = "my-collection";
@Bean
@Named(TYPE)
FlushableCollectionProperties collectionProperties() {
return FlushableCollectionProperties.builder()
.type(TYPE)
.maxCount(10)
.maxDataSize(DataSize.ofMegabytes(1).asBytes())
.flushAfter(Duration.ofMinutes(5))
.threads(10)
.flushTimeout(Duration.ofMinutes(10))
.build();
}
}
Each collection definition must have a unique name, which will be automatically normalized to kebab-case (i.e.
myCollection
becomesmy-collection
)
Action Handlers
Flush Handler
Consumer to be called when the flush event is triggered. For instance, a flush handler for a collection of integer elements can be defined as:
public class MyCollectionFlushHandler implements Consumer<FlushableCollection.Batch<Integer>> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch) {
// Do something with the batch
}
}
By default, does nothing: builder.flushHandler(batch -> {})
Weigher
Function to determine the size of an element. Required to trigger a Data Size flush. For instance, a collection of integer elements can define its weigher as:
public class MyCollectionWeigher implements Function<Integer, Long> {
@Override
public Long apply(Integer element) {
return element.toString().length();
}
}
By default, the weight is calculated after converting the element to String and counting its number of bytes using UTF-8 encoding.
Success Handler
Consumer to be executed if the flush handler was successfully executed. For instance, a success handler for a collection of integer elements can be defined as:
public class MyCollectionSuccessHandler implements Consumer<FlushableCollection.Batch<Integer>> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch) {
// Do something with the successful batch
}
}
By default, logs a debug message with the details of the processed batch: builder.successHandler(batch -> log.debug(...))
Failure Handler
BiConsumer to be executed if the flush handler failed. For instance, a failure handler for a collection of integer elements can be defined as:
public class MyCollectionFailureHandler implements Consumer<FlushableCollection.Batch<Integer>, Throwable> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch, Throwable ex) {
// Do something with the failed batch
}
}
By default, logs an error message with the details of the batch and its exception: builder.failureHandler((batch, ex) -> log.error(...))
. If the flush event causes a timeout, the input Throwable
will be of type java.util.concurrent.TimeoutException
.
Usage
The collection must be created through the FlushableCollectionFactory
bean, by providing the expected type. If the application context finds a FlushableCollectionProperties
with the same name, it will be used as template for the new collection. Otherwise, a new properties set with default values will be created. Note that any pre-defined property can be overridden during the build phase.
@Inject
protected FlushableCollectionFactory flushableCollectionFactory;
void submit() {
try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
.flushHandler(new MyCollectionFlushHandler())
.weigher(new MyCollectionWeigher())
.successHandler(new MyCollectionSuccessHandler())
.failureHandler(new MyCollectionFailureHandler())
.build()) {
for (int i = 0; i < 10; i++) {
collection.add(i);
}
}
}
Flush Events
COUNT
- Triggers if the collection contains more elements than the value defined in themaxCount
property. If undefined, it will never be triggered.DATA_SIZE
- Triggers if the size of the elements in the collection is greater than the value defined in themaxDataSize
property. If undefined, it will never be triggered.SCHEDULED
- Triggers based on the schedule defined with theflushAfter
property. If undefined, it will never be triggered.MANUAL
- Triggers whenever thecollection.flush()
method is called.CLOSE
- Triggers whenever the collection is closed by either using atry
with resources, or by calling thecollection.close()
method.
Flush Metrics
Each collection will publish metrics about the duration of each flush event, its size, and the count of success/failures (see Metrics section).
This can be refined with the use of custom tags during the creation of the collection:
try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
.tag("key", "value")
.build()) {
Configuration parameters
storage.binary.provider
(Required, String) Must be s3
.
storage.binary.region
(Required, String) The AWS region where the bucket is located.
storage.binary.bucket
(Required, String) The AWS bucket name used for storage.
storage.binary.dataPath
(Optional, String) The path where the data will be stored within a S3 bucket. Default is binary
.
storage.binary.credentials.accessKey
(Required, String) The access key provided by Amazon for connecting to the AWS S3 bucket.
storage.binary.credentials.secretKey
(Required, String) The secret key provided by Amazon for connecting to the AWS S3 bucket.
storage.binary.connection.connectTimeout
(Optional, Integer) Number of milliseconds before the connection times out. Defaults to 10000
.
storage.binary.connection.socketTimeout
(Optional, Integer) Number of milliseconds before the socket times out. Defaults to 50000
.
storage.binary.connection.maxConnections
(Optional, Integer) Number of maximum concurrent connections to AWS S3. Defaults to 50
.
storage.binary.connection.awsThrottlingProperties.enabled
(Optional, Boolean) Whether throttled retries should be used. Defaults to false
.
storage.binary.connection.awsThrottlingProperties.retriesBeforeThrottling
(Optional, Integer) Maximum number of consecutive failed retries that the client will permit before throttling all subsequent retries of failed requests.
storage.binary.connection.awsThrottlingProperties.maxRetries
(Optional, Integer) Maximum number of retry attempts for failed retryable requests.
Google Cloud Storage
Uses a Google Cloud Storage bucket for storage:
storage:
binary:
provider: gcs
gcsBucket: bucket
dataPath: ingestion
Configuration parameters
storage.binary.provider
(Required, String) Must be gcs
.
storage.binary.gcsBucket
(Required, String) The GCS bucket name used for storage.
storage.binary.dataPath
(Optional, String) The path where the data will be stored within a GCS bucket. Default is binary
.
Azure Cloud Storage
Uses an Azure Cloud Storage container for storage. The provider is able to authenticate by using the storageAccount
and setting the Default Azure Credential (recommended) or the use of the connectionString
field by configuring the parameter with the value in AZURE_STORAGE_CONNECTION_STRING .
storage:
binary:
provider: azure
dataPath: ingestion
azureContainer: bucket
connectionString: connectionStringVariable
storageAccount: storageAccountName
Configuration parameters
storage.binary.provider
(Required, String) Must be azure
.
storage.binary.dataPath
(Optional, String) The path where the buckets will be created within a container. Default is binary
.
storage.binary.azureContainer
(Required, String) The Azure Storage Container to connect to.
storage.binary.connectionString
(Optional, String) The connection string value. If present, it will have priority over the DefaultAzureCredentials.
storage.binary.storageAccount
(Optional, String) The storage account name to connect. It is required for default credentials and connectionString
must be null.
Cache Manager
The local cache is an extension to the one provided by Micronaut and implemented with Caffeine. It simplifies the process of caching entities with the use of annotations and configurations in the application.yml
Configuration
storage:
cache:
local:
myEntity:
initialCapacity: 10
maximumSize: 50
maximumWeight: 100
expireAfterWrite: 1h
expireAfterAccess: 5m
recordStats: true
testMode: false
For a cache named myEntity
with the following properties:
Property | Type | Default | Description |
---|---|---|---|
initialCapacity | Integer | 16 | The minimum size of the cache |
maximumSize | Long | The maximum size of the cache. Can not be combined with a weigher | |
maximumWeight | Long | The maximum weight to be allowed for an element in the cache (see Weigher section) | |
expireAfterWrite | Duration | The time to wait to expire an element after its creation | |
expireAfterAccess | Duration | 5m | The time to wait to expire an element after the last time it was accessed |
recordStats | boolean | true | To record statistics about hit rates and evictions (see Cache Statistics section) |
testMode | boolean | false | To execute all cache operations in a single thread |
Each cache must have a unique name, which will be automatically normalized to kebab-case (i.e.
myEntity
becomesmy-entity
)
A default configuration for a cache can be defined as a bean:
@Factory
@Requires(missingProperty = LocalCacheProperties.PREFIX + "." + MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheConfig {
public static final String CACHE_NAME = "my-entity";
@Bean
@Singleton
@Named(CACHE_NAME)
LocalCacheProperties cacheProperties(ApplicationConfiguration applicationConfiguration) {
return LocalCacheProperties.builder()
.cacheName(CACHE_NAME)
.applicationConfiguration(applicationConfiguration)
.build();
}
}
Weigher
A weigher determines if an element becomes too heavy to be in the cache. If for a cache's maximumWeight
there is not a corresponding named weigher, any other weigher will be selected. If there is no registered weigher, a default weigher where every element has a weight of 1 will be used:
import com.github.benmanes.caffeine.cache.*;
@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheWeigher implements Weigher<UUID, MyEntity> {
@Override
public @NonNegative int weigh(@NonNull UUID key, @NonNull MyEntity value) {
return 0;
}
}
Removal Listener
A listener that triggers every time an element is evicted:
import com.github.benmanes.caffeine.cache.*;
@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheRemovalListener implements RemovalListener<UUID, MyEntity> {
@Override
public void onRemoval(@Nullable UUID key, @Nullable MyEntity value, @NonNull RemovalCause cause) {
// Do something with the event
}
}
Annotation-based caching
Any class (POJOs, Connections, Factories...) can be stored in the cache. For example, instances of the following entity type:
@Data
public class MyEntity implements CoreEntity<UUID> {
private UUID id;
private String name;
...
}
can be cached in any managed bean with the use of io.micronaut.cache.annotation
annotations:
@Cacheable
@CachePut
@CacheInvalidate
@Singleton
@CacheConfig(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityService {
@Inject
protected MyEntityRepository myEntityRepository;
@Cacheable(keyGenerator = CoreEntityKeyGenerator.class)
public List<MyEntity> getAll() {
return myEntityRepository.getAll();
}
@Cacheable
public MyEntity getOne(UUID id) {
return myEntityRepository.getOne(id);
}
@CachePut(keyGenerator = CoreEntityKeyGenerator.class)
public void store(MyEntity myEntity) {
myEntityRepository.store(myEntity);
}
@CacheInvalidate
public MyEntity delete(UUID id) {
return myEntityRepository.delete(id);
}
}
The key for the cacheable object must implement
equals()
andhashCode()
Note that for the getAll()
and store(MyEntity)
methods, a custom key generator needs to be specified. This way the cache will calculate the appropriate key
for each entity. If no generator is defined, the DefaultCacheKeyGenerator
is used.
The
CoreEntityKeyGenerator
can be used with any entity that implementsCoreEntity<T>
Multiple caches can be configured in the same @CacheConfig
, in which case, the name of the used cache must be specified. Likewise, the key for the cached value can be a composite of multiple objects (internally wrapped as a ParametersKey
and generated by a DefaultCacheKeyGenerator
):
@Singleton
@CacheConfig({ "cacheA", "cacheB" })
public class MyMultiCacheService {
@Cacheable("cacheA")
public MyEntity getOneA(UUID id) {
...
}
@Cacheable("cacheB")
public MyEntity getOneB(UUID id, UUID parentId) {
...
}
}
Cache Statistics
If the cache statistics are enabled, they will be published as part of the application metrics:
cache.eviction.weight
- the sum of weights of evicted entriescache.evictions
- the count of cache evictionscache.size
- the estimated number of entries in the cachecache.gets
- the number of times a cache-annotated method has returned an item (regardless if it was cached or not). This metric can be refined with the use of tags:result:hit
- the number of times cache lookup methods have returned a cached valueresult:miss
- the number of times cache lookup methods have returned an uncached value
If the application has multiple caches, the metrics can be filtered with the
cache:my-entity
tag
Collections
Flushable Collection
A data structure that asynchronously flushes its content any time a preconfigured criteria is met. It is backed up by an ArrayList<T>
and it is guaranteed to be thread-safe.
Configuration
Basic Properties
Property | Type | Default | Description |
---|---|---|---|
maxCount | Integer | The maximum number of elements before flushing. Triggers a Count flush | |
maxDataSize | Long | The maximum size of the collection elements before flushing. Triggers a Data Size flush | |
flushAfter | Duration | The duration before flushing. Triggers a Scheduled flush | |
threads | Integer | 5 | The number of threads used to execute the flush event |
flushTimeout | Duration | 10m | The timeout for the flush event |
Properties Template
For a collection of type my-collection
, a set of default properties can be defined as a bean or in the application.yml
:
collections:
flushable:
myCollection:
maxCount: 10
maxDataSize: 1mb
flushAfter: 5m
threads: 10
flushTimeout: 10m
The same configuration can be defined as:
@Factory
@Requires(missingProperty = FlushableCollectionProperties.PREFIX + "." + MyFlushableCollectionConfig.TYPE)
public class MyFlushableCollectionConfig {
public static final String TYPE = "my-collection";
@Bean
@Named(TYPE)
FlushableCollectionProperties collectionProperties() {
return FlushableCollectionProperties.builder()
.type(TYPE)
.maxCount(10)
.maxDataSize(DataSize.ofMegabytes(1).asBytes())
.flushAfter(Duration.ofMinutes(5))
.threads(10)
.flushTimeout(Duration.ofMinutes(10))
.build();
}
}
Each collection definition must have a unique name, which will be automatically normalized to kebab-case (i.e.
myCollection
becomesmy-collection
)
Action Handlers
Flush Handler
Consumer to be called when the flush event is triggered. For instance, a flush handler for a collection of integer elements can be defined as:
public class MyCollectionFlushHandler implements Consumer<FlushableCollection.Batch<Integer>> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch) {
// Do something with the collectionbatch
}
}
The metrics will then be available in:
GET /metrics/pdp.collections.flushable.[type]
- The count of successful and failed flush events.GET /metrics/pdp.collections.flushable.[type].duration
- The duration for the flush handler.GET /metrics/pdp.collections.flushable.[type].size
- The size of the flushed elements.
DSL
The PDP DSL is an abstract definition for a common language to be used in any PDP product. Its intention is to have a standardized way to express configurations that might be interpreted in a different way according to the needs of the product itself.
Filters
A filter is a criteria to be applied to a given object. In order to use it, the FilterAdapter
interface needs to be implemented. The core supports the following concrete adapters:
MapFilterAdapter
- Converts the filter into a predicate used to evaluateMap<String, Object>
structures. The expected field name is the key of the map.JsonPathFilterAdapter
- Converts the filter into a predicate used to evaluateDocumentContext
structures. The expected field name is a JSON Path to be found in the JSON document.JsonPointerFilterAdapter
- Converts the filter into a predicate used to evaluateObjectNode
structures. The expected field name is a JSON Pointer to be found in the JSON document.
All filters have an optional
source
field that could be used by the concrete implementation to select among multiple data structures
"Equals" Filter
The value of the field must be exactly as the one provided.
var filter = EqualsFilter.builder().field("field").value("value").build();
{
"equals": {
"field": "field",
"value": "value"
}
}
"Greater Than" Filter
The value of the field must be greater than the one provided.
var filter = GreaterThanFilter.builder().field("field").value(1).build();
{
"gt": {
"field": "field",
"value": 1
}
}
"Greater Than or Equals" Filter
The value of the field must be greater than or equals to the one provided.
var filter = GreaterThanOrEqualsFilter.builder().field("field").value(1).build();
{
"gte": {
"field": "field",
"value": 1
}
}
"Less Than" Filter
The value of the field must be less than the one provided.
var filter = LessThanFilter.builder().field("field").value(1).build();
{
"lt": {
"field": "field",
"value": 1
}
}
"Less Than or Equals" Filter
The value of the field must be less than or equals to the one provided.
var filter = LessThanOrEqualsFilter.builder().field("field").value(1).build();
{
"lte": {
"field": "field",
"value": 1
}
}
"In" Filter
The value of the field must be one of the provided values.
var filter = InFilter.builder().field("field").value("valueA").value("valueB").build();
{
"in": {
"field": "field",
"values": [
"valueA",
"valueB"
]
}
}
"Empty" Filter
Checks if a field is empty:
- For a collection,
true
if its size is 0 - For a String,
true
if its length is 0 - For any other type,
true
if it isnull
var filter = EmptyFilter.builder()
.field("field")
.build();
{
"empty": {
"field": "field"
}
}
"Exists" Filter
Checks if a field exists.
...
By default, does nothing: builder.flushHandler(batch -> {})
Weigher
Function to determine the size of an element. Required to trigger a Data Size flush. For instance, a collection of integer elements can define its weigher as:
public class MyCollectionWeigher implements Function<Integer, Long> {
@Override
public Long apply(Integer element) {
return element.toString().length();
}
}
By default, the weight is calculated after converting the element to String and counting its number of bytes using UTF-8 encoding.
Success Handler
Consumer to be executed if the flush handler was successfully executed. For instance, a success handler for a collection of integer elements can be defined as:
public class MyCollectionSuccessHandler implements Consumer<FlushableCollection.Batch<Integer>> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch) {
// Do something with the successful batch
}
}
By default, logs a debug message with the details of the processed batch: builder.successHandler(batch -> log.debug(...))
Failure Handler
BiConsumer to be executed if the flush handler failed. For instance, a failure handler for a collection of integer elements can be defined as:
public class MyCollectionFailureHandler implements Consumer<FlushableCollection.Batch<Integer>, Throwable> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch, Throwable ex) {
// Do something with the failed batch
}
}
By default, logs an error message with the details of the batch and its exception: builder.failureHandler((batch, ex) -> log.error(...))
. If the flush event causes a timeout, the input Throwable
will be of type java.util.concurrent.TimeoutException
.
Usage
The collection must be created through the FlushableCollectionFactory
bean, by providing the expected type. If the application context finds a FlushableCollectionProperties
with the same name, it will be used as template for the new collection. Otherwise, a new properties set with default values will be created. Note that any pre-defined property can be overridden during the build phase.
@Inject
protected FlushableCollectionFactory flushableCollectionFactory;
void submit() {
try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
.flushHandler(new MyCollectionFlushHandler())
.weigher(new MyCollectionWeigher())
.successHandler(new MyCollectionSuccessHandler())
.failureHandler(new MyCollectionFailureHandler())
.build()) {
for (int i = 0; i < 10; i++) {
collection.add(i);
}
}
}
Flush Events
COUNT
- Triggers if the collection contains more elements than the value defined in themaxCount
property. If undefined, it will never be triggered.DATA_SIZE
- Triggers if the size of the elements in the collection is greater than the value defined in themaxDataSize
property. If undefined, it will never be triggered.SCHEDULED
- Triggers based on the schedule defined with theflushAfter
property. If undefined, it will never be triggered.MANUAL
- Triggers whenever thecollection.flush()
method is called.CLOSE
- Triggers whenever the collection is closed by either using atry
with resources, or by calling thecollection.close()
method.
Flush Metrics
Each collection will publish metrics about the duration of each flush event, its size, and the count of success/failures (see Metrics section).
This can be refined with the use of custom tags during the creation of the collection:
try (var collection = flushableCollectionFactory.<Integer>builder("my-collection") .fieldtag("key", "fieldvalue") .build());
{
"exists": { "field": "field" } }
"Not" Filter
Negates the inner clause.
var filter = NotFilter.builder()
.clause(EqualsFilter.builder().field("field").value("value").build())
.build();
{
"not": {
"equals": {
"field": "field",
"value": "value"
}
}
}
"Null" Filter
Checks if a field is null. Note that while the "exists" filter checks whether the field is present or not, the "null" filter expects the field to be present but with null
value.
var filter = NullFilter.builder()
.field("field")
.build();
{
"null": {
"field": "field"
}
}
Boolean Operators
"And" Filter
All conditions in the list must be evaluated to true
.
var filter = AndFilter.builder()
.clause(// Do something with the collection
}
The metrics will then be available in:
GET /metrics/pdp.collections.flushable.[type]
- The count of successful and failed flush events.GET /metrics/pdp.collections.flushable.[type].duration
- The duration for the flush handler.GET /metrics/pdp.collections.flushable.[type].size
- The size of the flushed elements.
DSL
The PDP DSL is an abstract definition for a common language to be used in any PDP product. Its intention is to have a standardized way to express configurations that might be interpreted in a different way according to the needs of the product itself.
Filters
A filter is a criteria to be applied to a given object. In order to use it, the FilterAdapter
interface needs to be implemented. The core supports the following concrete adapters:
MapFilterAdapter
- Converts the filter into a predicate used to evaluateMap<String, Object>
structures. The expected field name is the key of the map.JsonPathFilterAdapter
- Converts the filter into a predicate used to evaluateDocumentContext
structures. The expected field name is a JSON Path to be found in the JSON document.JsonPointerFilterAdapter
- Converts the filter into a predicate used to evaluateObjectNode
structures. The expected field name is a JSON Pointer to be found in the JSON document.
All filters have an optional
source
field that could be used by the concrete implementation to select among multiple data structures
"Equals" Filter
The value of the field must be exactly as the one provided.
var filter = EqualsFilter.builder().field("fieldAfield").value("valueAvalue").build());
{ "equals": {
.clause(EqualsFilter"field": "field", "value": "value" } }
"Greater Than" Filter
The value of the field must be greater than the one provided.
var filter = GreaterThanFilter.builder().field("fieldBfield").value("valueB"1).build())
.build();
{
"andgt": [
{
"equalsfield": {
"field": "fieldA",
"value": "valueA"
1
}
}, {
"equals":
"Greater Than or Equals" Filter
The value of the field must be greater than or equals to the one provided.
var filter = GreaterThanOrEqualsFilter.builder().field("field").value(1).build();
{
"gte": {
"field": "fieldBfield",
"value": "valueB"1
}
}
]
}
"
...
Less Than" Filter
At least one condition in the list must be evaluated to true
The value of the field must be less than the one provided.
var filter = OrFilter.builder()
.clause(EqualsFilterLessThanFilter.builder().field("fieldAfield").value("valueA"1).build());
{ "lt": {
.clause(EqualsFilter"field": "field", "value": 1 } }
"Less Than or Equals" Filter
The value of the field must be less than or equals to the one provided.
var filter = LessThanOrEqualsFilter.builder().field("fieldBfield").value("valueB").build())
1).build();
{
"or": [
{
"equalslte": {
"field": "fieldAfield",
"value": "valueA"1
}
}
}, {
"In" Filter
The value of the field must be one of the provided values.
var filter = InFilter.builder().field("field").value("valueA").value("valueB").build();
{
"equalsin": {
"field": "fieldBfield",
"values": [
"value": "valueBvalueA",
}"valueB"
}]
]}
}
HTTP Base Client
The HTTP Base Client is an abstraction of the OkHttpClient. It provides features such as retry handlers and automated paginated requests.
The minimum initialization for the client defines the configuration of the HTTP connections and other shared properties to be used by the endpoints:
...
"Empty" Filter
Checks if a field is empty:
- For a collection,
true
if its size is 0 - For a String,
true
if its length is 0 - For any other type,
true
if it isnull
var filter = EmptyFilter.builder()
.field("field")
@NonNull OkHttpClient client,.build();
{ "empty": { "field": "field"
@NonNullObjectMapper
objectMapper,
} }
"Exists" Filter
Checks if a field exists.
var filter = ExistsFilter.builder()
@NonNull BackoffPolicyProperties backoffPolicy,
.field("field")
MeterRegistry meterRegistry,
.build();
{ "exists": { "field": "field"
@NonNullScrollProperties
}
scroll) {}
"Not" Filter
Negates the inner clause.
var filter = NotFilter.builder()
super(client, objectMapper, backoffPolicy, scroll);
} .clause(EqualsFilter.builder().field("field").value("value").build())
@Override public boolean ping.build();
{ "not": {
returntrue;
}
"equals": {
publicstatic
Builder builder() {
"field": "field",
returnnew Builder();
"value": "value"
}}
publicstatic
}
final class Builder extends HttpBaseClientBuilder<Builder, MyClient> { @Override}
"Null" Filter
Checks if a field is null. Note that while the "exists" filter checks whether the field is present or not, the "null" filter expects the field to be present but with null
value.
var filter = NullFilter.builder()
public MyClient build(.field("field")
{ return new MyClient(.build();
{ "null": { "field": "field" }
newClientBuilder().build(),}
Boolean Operators
"And" Filter
All conditions in the list must be evaluated to true
.
var filter = AndFilter.builder()
.clause(EqualsFilter.builder().field("fieldA").value("valueA").build())
.clause(EqualsFilter.builder().field("fieldB").value("valueB").build())
getObjectMapper.build(),;
{ "and": [ {
getBackoffPolicy(),"equals": { "field": "fieldA",
getScroll());}
} }
The builder can be also initialized with a HttpBaseClientProperties
instance:
public class MyClient extends HttpBaseClient { ... public static Builder builder(HttpBaseClientProperties clientProperties)"value": "valueA" } }, {
returnnew Builder(clientProperties);
"equals": {
}@NoArgsConstructor
public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> { protected Builder(HttpBaseClientProperties clientProperties) { super(clientProperties); }
"field": "fieldB", "value": "valueB" } } ] }
"Or" Filter
At least one condition in the list must be evaluated to true
.
var filter = OrFilter.builder()
.clause(EqualsFilter.builder().field("fieldA").value("valueA").build())
} }
If some configurations are not desired on the implemented client, they can be disabled in the builder:
public class MyClient extends HttpBaseClient {
...
@HttpBaseClientBuilder.DisabledConfig({ .clause(EqualsFilter.builder().field("fieldB").value("valueB").build())
.build();
{ "or": [ { "equals": {
HttpBaseClientBuilder.Config.COMPRESS_REQUESTS,"field": "fieldA",
HttpBaseClientBuilder.Config.FOLLOW_REDIRECTS,"value": "valueA"
HttpBaseClientBuilder.Config.SCROLL} }
), { "equals": {
publicstatic
final
class
Builder
extends HttpBaseClientBuilder<Builder, MyClient> {
"field": "fieldB", "value": "valueB" }
...} ] }
any attempt to set a disabled configuration will throw an UnsupportedOperationException
.
By default, the retry of the executeWithBackoff(...)
method will happen if the HTTP response code is:
- 408 - Request Time Out
- 429 - Too Many Request
- 500 - Internal Server Error
- 504 - Gateway Timeout
The condition can be changed by sending a different predicate to the method:
client.executeWithBackoff(requestDetails, response -> false);
In this case, the request will not be retried because of a status code (although it might be retried because of an exception during the execution).
HTTP Round Robin Client
The HTTP Round Robin Client is a specialization of the HttpBaseClient
backed up by a round-robin collection. Provides the corresponding classes as described in the previous section: HttpRoundRobinClient
, HttpRoundRobinClientBuilder
, and HttpRoundRobinClientProperties
.
HTTP Client Request
The request object is a simple POJO with the properties needed to execute the HTTP call:
@RequiredArgsConstructor
public class MyRequest implements HttpClientRequest {
private final String id;
}
For cases when no data is needed in the request, an empty implementation is avaliable in
HttpRoundRobinClient#EMPTY_REQUEST
HTTP Client Pageable Request
The HttpClientPageableRequest
interface is an extension of HttpClientRequest
and defines default methods to handle pagination parameters.
HTTP Client Response
Single Response
Expected when the response is a single payload.
public class MySinglePayloadResponse extends AbstractSingleResponse<MyRequest> {
public MySinglePayloadResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse);
}
@Override
protected void handleSuccess(Response httpResponse) {
// Do something with the response
}
}
With this definition, the endpoint can be exposed in the client:
public class MyClient extends HttpRoundRobinClient {
...
public MySinglePayloadResponse get(MyRequest clientRequest) {
var url = serverHosts.next()
.newBuilder()
.addPathSegment("content")
.addPathSegment(request.getId())
.build();
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(url).get().build(),
clientRequest);
return new MySinglePayloadResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails)
);
}
...
}
Both handleSuccess(Response)
and handleFailure(Response)
have a default implementation that can be overridden. A response is considered a success if its HTTP status code is in the 200 to 299 range and its body is not null
(not to be confused with an empty body). This check can be changed in the constructor:
public class MySinglePayloadResponse extends AbstractSingleResponse<MyRequest> {
public MySinglePayloadResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse, r -> r.code() == 400);
}
}
If the response was a failure, but it had a payload, it can be retrieved with AbstractSingleResponse#getErrorMessage()
, or AbstractSingleResponse#getErrorMessage(Class)
(to convert the payload into the given class type).
Iterable Response
Expected when the response is a full collection of iterable elements with no pagination. It is assumed that the payload of the response is a JSON array (if that's not the case, the handleSuccess(Response httpResponse)
method must be overridden).
public class MyIterableResponse extends AbstractIterableResponse<MyRequest, String> {
public MyIterableResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse);
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
With this definition, the endpoint can be exposed in the client:
public class MyClient extends HttpRoundRobinClient {
...
public MyIterableResponse list(MyRequest clientRequest) {
var url = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.build();
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(url).get().build(),
clientRequest);
return new MyIterableResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails)
);
}
...
}
Each element in the response can then be retrieved by using the Iterable<>
interface.
The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.
Paginated Response
Token Page
Expected when the response is a consecutive list of pages obtained with a token.
Token Page Response
The response corresponds to each page from the pagination requests. Since it is not possible to automatically determine where is the token for the next page, the AbstractTokenPageResponse#handleSuccess(Response)
method must be implemented.
public class MyTokenPageResponse extends AbstractTokenPageResponse<MyRequest, String> {
public MyTokenPageResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse,
String currentToken
) {
super(objectMapper, requestDetails, httpResponse, currentToken);
}
@Override
protected void handleSuccess(Response response) {
try {
var json = objectMapper.readTree(Objects.requireNonNull(response.body()).byteStream());
this.nextPageReference = json.get("token").asText();
if (json.has("content")) {
this.elements = new JsonLazyIterator<>(json.get("content"), this::parseElement);
} else {
this.elements = Collections.emptyIterator();
}
} catch (Exception ex) {
throw new DataException(ex.getMessage(), ex);
}
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.
Token Page Collection
The response corresponds to a collection of all the page responses that iterates through the data.
public class MyTokenPageCollectionResponse extends AbstractTokenCollectionResponse<MyRequest, String, MyTokenPageResponse> {
public MyTokenPageCollectionResponse(Function<String, MyTokenPageResponse> pageFunction) {
super(pageFunction);
}
}
With this definition, the endpoint can be exposed in the client:
public class MyClient extends HttpRoundRobinClient {
...
public MyTokenPageCollectionResponse paginatedByToken(MyRequest clientRequest) {
return new MyTokenPageCollectionResponse(
token -> {
var urlBuilder = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.addQueryParameter("limit", String.valueOf(scroll.getSize()));
if (token != null) {
urlBuilder.addQueryParameter("token", token);
}
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(urlBuilder.build()).get().build(),
clientRequest);
return new MyTokenPageResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails),
token
);
}
);
}
...
}
Note that in case of a null
token, the function must return the first page.
Each element in the response can then be retrieved by using the Iterable<>
interface. The pagination will be handled behind the scenes.
Offset Page
Expected when the response is a consecutive list of pages obtained with a page number.
Offset Page Response
The response corresponds to each page from the pagination requests. It is assumed that the payload of the response is a JSON array (if that's not the case, the handleSuccess(Response httpResponse)
method must be overridden).
It is also assumed that the AbstractOffsetPageResponse#getNextPageReference()
is always 1 after the current page. This method can be overridden for custom behaviors.
public class MyOffsetPageResponse extends AbstractOffsetPageResponse<MyRequest, String> {
public MyOffsetPageResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse,
Long currentOffset
) {
super(objectMapper, requestDetails, httpResponse, currentOffset);
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.
Pageable Page Response
The response is a specialization of the Offset Page Response, and it is based on a default page structure:
{
"totalPages": 0,
"totalElements": 0,
"size": 0,
"content": [
{}
],
"number": 0,
"first": true,
"last": true,
"sort": {
"sorted": true,
"unsorted": true,
"empty": true
},
"numberOfElements": 0,
"pageable": {
"page": 0,
"size": 0,
"sort": [
"string"
]
},
"empty": true
}
Offset Page Collection
The response corresponds to a collection of all the page responses that iterates through the data.
public class MyOffsetPageCollectionResponse extends AbstractOffsetCollectionResponse<MyRequest, String, MyOffsetPageResponse> {
public MyOffsetPageCollectionResponse(Function<String, MyOffsetPageResponse> pageFunction) {
super(pageFunction);
}
}
With this definition, the endpoint can be exposed in the client:
public class MyClient extends HttpRoundRobinClient {
...
public MyOffsetPageCollectionResponse paginatedByOffset(MyRequest clientRequest) {
return new MyOffsetPageCollectionResponse(
offset -> {
var urlBuilder = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.addQueryParameter("limit", String.valueOf(scroll.getSize()));
if (token != null) {
urlBuilder.addQueryParameter("page", String.valueOf(offset));
}
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(urlBuilder.build()).get().build(),
clientRequest);
return new MyOffsetPageResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails),
offset
);
}
);
}
...
}
Note that in case of a null
token, the function must return the first page.
Each element in the response can then be retrieved by using the Iterable<>
interface. The pagination will be handled behind the scenes.
JSON
JSON handling is done with Jackson Project, and the default configuration for the mapper can be instantiated with com.pureinsights.pdp.core.config.CoreSerializer.newInstance()
.
Default Object Mapper
Registered Modules
JavaTimeModule
to handle the JSR-310: Date and Time API
Registered Deserializers
com.pureinsights.pdp.core.time.CoreDurationDeserializer
as an extension of the defaultDurationDeserializer
to allow values in human-readable format (i.e. "1s", "12m", "5h"...). If no suffix is provided, milliseconds are assumed. The supported suffixes are:d
- daysh
- hoursm
- minutess
- secondsms
- millisecondsns
- nanoseconds
Enabled Features
MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS
MapperFeature.ACCEPT_CASE_INSENSITIVE_VALUES
DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY
Disabled Features
SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS
SerializationFeature.WRITE_DATES_AS_TIMESTAMPS
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE
Logging Handler
The logging handler simplifies the process of log analysis by using a JSON output with support for custom POJOs.
Logback Configuration
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<provider class="com.pureinsights.pdp.core.logging.CoreArgumentsProvider"/>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Custom POJO
@Getter
public class MyPOJO {
private final UUID id = UUID.randomUUID();
}
public class MyPOJOLoggingHandler implements PojoLoggingHandler<MyPOJO> {
private static final String PARENT_LABEL = "myPOJO";
private static final String ID_LABEL = "id";
@Override
public Class<MyPOJO> getSupportedType() {
return MyPOJO.class;
}
@Override
public void write(JsonGenerator generator, MyPOJO pojo) throws IOException {
generator.writeFieldName(PARENT_LABEL);
generator.writeStartObject();
generator.writeObjectField(ID_LABEL, pojo.getId());
generator.writeEndObject();
}
}
In order to invoke the handler, the class type can call the logging framework:
log.info("Message", new MyPOJO());
log.info("Message {}", new MyPOJO());
log.info("Message {}", "with placeholder", new MyPOJO());
{"@timestamp":"2021-12-08T15:07:11.379Z","@version":1,"message":"Message","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"707ea126-d94b-47b6-9010-72a3acc8a786"}}
{"@timestamp":"2021-12-08T15:07:11.380Z","@version":1,"message":"Message MyPOJO$373141eb","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"398bb1df-29b3-4738-afc6-a7b9e892ce19"}}
{"@timestamp":"2021-12-08T15:07:11.381Z","@version":1,"message":"Message with placeholder","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"00ba3228-f434-4ed0-b45e-62ec9163aac4"}}
Note that the custom POJO is always displayed as expected. The example also demonstrates that the POJO must not have a corresponding placeholder in the message as it will be automatically resolved by the provider. Exceptions must be the in the last position of the input:
...
} catch (Exception ex) {
log.error(ex.getMessage(), new MyPOJO(), ex);
}
If the logging handler is part of the PDP Core Common Libraries project, it must be registered in the static section of the com.pureinsights.pdp.core.logging.CoreArgumentsProvider
class.
Any other logging handler under an application context, must be declared as a bean (i.e. @Singleton
).
If none of the above is applicable, the handler must be registered using the CoreArgumentsProvider#registerHandler(PojoLoggingHandler)
method.
Metrics
The metrics of the application are collected and exposed by Micrometer. They are enabled by default in the application.yml
and exposed through the GET /metrics
and GET /metrics/[name]
endpoints:
micronaut:
metrics:
enabled: true
{
"names": [
"http.server.requests",
"jvm.buffer.count",
"jvm.buffer.memory.used",
...,
"pdp.metric.count"
]
}
When available, the metrics can be filtered with the use of tags:
GET /metrics/pdp.metric.count?tag=source:component&tag=type:value1
By default, all metrics include the
component
tag with the name of the application
Annotation-based metrics
Any method in any bean can be measured with:
@Timed
@Counted
@Singleton
public class MyService {
@Timed(value = "pdp.myservice.execute", histogram = true)
public void execute(UUID id) {
...
}
@Counted("pdp.myservice.validate")
public void validate(UUID id) {
...
}
}
The name of the metric must be in lowercase, using dot notation
Custom metrics
Instead of using annotations, the metric registry can be injected and manually handled in any bean, with any type of meter:
@Singleton
public class MyCustomService {
private Counter counter;
@PostConstruct
void init(MeterRegistry meterRegistry) {
counter = Counter.builder("pdp.mycustomservice.count")
.tag("key", "value")
.register(meterRegistry);
}
public void count() {
counter.increment();
}
}
Retries
RetryUtils
, as its name may suggest, provide utils or methods to handle the retry of a Runnable
or Supplier
. It also handles the exceptions might occur during the run.
By default, the RetryUtils
will use a ConstantDelayRetry
, wich will retry 3 times with a delay of 5 seconds.
...
AtomicInteger atomicInteger = new AtomicInteger();
RetryUtils.retry(atomicInteger::getAndIncrement)
You can also specify a custom retry policy passing it through parameters.
...
AtomicInteger atomicInteger = new AtomicInteger();
RetryUtils.retry(atomicInteger::getAndIncrement, new ConstantDelayRetry(2, 100))
In case of any failure during the retry an exception will be thrown and the RetryUtils
will handle it. When the number of retries reaches the specified in the retry policy a CoreException
will be thrown.
Scheduler
Scheduled tasks are handled by Quartz. Any application can inject the org.quartz.Scheduler
bean to register/unregister its own cron jobs.
Any execution of a org.quartz.Job
implementation will automatically perform the dependency injection of its visible fields without the need of declaring the class as a singleton.
Message Queue Provider
To handle communication between the different components, PDP uses a message queue. The following is a quick overview of the available configuration fields, examples on how to use the Message Queue classes and the current implementations available with configuration examples.
Configuration
Basic Configuration
| Property | Type | Default | Description | |---------------------------|:------:|:-------:|----------------------------------------------------------------------------------------------| | messageQueue.provider
| String | | The name of the MessageQueue provider to use. (e.g. rabbitmq to load the RabbitMQ provider). |
Using the MessageQueueProvider
The MessageQueueProvider instance can be injected as follows:
public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
Messages
Message Properties
| Property | Type | Default | Description | |---------------|:-------------------:|:-------:|-------------------------------------------------------------------------------------------| | body
| byte[] | | The body of the message. | | queue
| String | | The queue this message was sent to or delivered from. | | type
| Message.MessageType | | The type of the message. Can be DIRECT or BROADCAST. See Message Types. | | properties
| Map<String,String> | | Map of message properties. |
Message Types
Direct Messages
Direct messages are sent to a given queue and are meant to be consumed by a single consumer. The message is persisted in the queue until a consumer is available.
Broadcast Messages
Broadcast messages are sent to a given queue and are meant to be consumed by all consumers available at the moment of sending. The message is not persisted if no consumers are available.
Creating a new Message
The Message class provides a builder to simplify the message creation process:
Message newMessage = Message.builder()
.body("My message")
.type(Message.MessageType.DIRECT)
.queue("queue")
.property("key", "value")
.build();
Some considerations with the Message builder:
- There are multiple ways to set the message body. If more than one is set, only the last one will be used.
- Every time the
property
method is called, the given property is appended to the map. - Calling
properties
method will overwrite any previously set properties.
Consumers
Configuration
| Property | Type | Default | Description | |---------------------------------------|:-------:|:-------:|-------------------------------------------------------------------------------| | consumer.{name}.maxMessages
| Integer | 5 | Maximum number of messages to consume in parallel. | | consumer.{name}.maxMessageRequeues
| Integer | 5 | Maximum number of times a message should be sent back to the queue for retry. |
Example
messageQueue:
consumer:
directConsumer:
maxMessages: 4
maxMessageRequeues: 6
This configuration can be loaded as follows
protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
try {
var maxRequeues = consumerProperties.getMaxMessageRequeues();
...
}
}
Registering Consumers
To register a consumer do the following:
public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
try {
mqProvider.registerConsumer(queue, consumerProperties, Message.MessageType.DIRECT, this::consumeMessage);
...
}
}
The registerConsumer
method takes the following parameters:
| Parameter | Type | Description | |--------------------|:--------------------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| | queue | String | Name of the queue to register the Consumer to. | | consumerProperties | ConsumerProperties | A ConsumerProperties instance for the consumer. | | messageType | Message.MessageType | The type of message this consumer is supposed to listen to. Can be Message.MessageType.DIRECT or Message.MessageType.BROADCAST. See Message Types. | | onMessageDelivery | Predicate<Message> | The function to execute when a message is received. Returns true if the message is successfully consumed, false otherwise. |
Producers
Configuration
| Property | Type | Default | Description | |-------------------------------|:--------:|:-------:|--------------------------------------------------------------------------------------------------| | producer.threads
| Integer | 5 | Number of threads available for producers to send asyncMessages. | | producer.sendMessageTimeout
| Duration | 30s | Maximum time to wait to get a confirmation that a message has been successfully sent. | | producer.retry.retries
| Integer | 5 | Maximum number of times to try to send a message. | | producer.retry.delay
| Duration | 1s | Time to wait before retrying. The time is multiplied by the number of retries to backoff between executions. |
Example
messageQueue:
producer:
sendMessageTimeout: 10000
threads: 10
retry:
retries: 10
delay: 10000
This configuration can be loaded by injecting the ProducerProperties instance:
public class MyClass {
@Inject
ProducerProperties producerProperties;
...
Getting a Producer instance
To get a new producer instance do the following:
public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
protected void myMethod() {
try {
var myProducer = mqProvider.getProducer();
...
}
}
EvalEx
EvalEx is a lightweight library capable of processing numeric, logical, conditional, String, array and structure-based expressions at runtime. It doesn’t need any external dependencies. Some minor functionalities such as support for hexadecimal and implicit multiplication are present. None of the functions are null safe due to the conversion of the parameters to EvaluationValue
during processing.
Example of basic expression evaluation without variables or configuration.
var input = "5 * 2";
var expression = new Expression(input);
var rawResult = expression.evaluate();
int finalResult = rawResult.getNumberValue().intValue();
If the expression has variables, a Map with the value of said variables must be provided.
var input = "x * y";
Map<String, Object> variables = new HashMap<>();
variables.put("x", "5");
variables.put("y", "2");
var expression = new Expression(input);
var rawResult = expression.withValues(variables).evaluate();
int finalResult = rawResult.getNumberValue().intValue();
Exceptions
The library provides two new exceptions which must be handled. A ParseException
is thrown, when the expression could not be parsed. An EvaluationException
is thrown, when the expression could be parsed, but not evaluated. If possible, both exceptions report the following details:
- Start Position of the error (character position, starting with 1).
- End Position of the error (character position, starting with 1).
- The Token string, usually the operator, function, variable or literal causing the error.
- The error message.
Default Functions
Although the default operators and functions are not explicitly listed in the official documentation, they can be seen in the ExpressionConfiguration
class.
Function | Description | Example |
---|---|---|
ABS | Returns the absolute value of a value | ABS(-7) = 7 |
CEILING | Rounds a number towards positive infinity | CEILING(1.1) = 2 |
FACT | Returns the factorial of a number | FACT(5) = 120 |
FLOOR | Rounds a number towards negative infinity | FLOOR(1.9) = 1 |
IF | Conditional operation where if the first parameter is true, the second one is executed. If not, the third parameter is executed | IF(TRUE, 5+1, 6+2) = 8 |
LOG | Performs the logarithm with base e on a value | LOG(5) = 1.609 |
LOG10 | Performs the logarithm with base 10 on a value | LOG10(5) = 0.698 |
MAX | Returns the highest value from all the parameters provided | MAX(5, 55, 6, 102) = 102 |
MIN | Returns the lowest value from all the parameters provided | MIN(5, 55, 6, 102) = 5 |
NOT | Negates a boolean expression | NOT(True) = false |
RANDOM | Returns random number between 0 and 1 | RANDOM() = 0.1613... |
ROUND | Rounds a decimal number to a specified scale | ROUND(0.5652,2) = 0.57 |
SUM | Returns the sum of the parameters | SUM(0.5, 3, 1) = 4.5 |
SQRT | Returns the square root of the value provided | SQRT(4) = 2 |
ACOS | Returns the arc-cosine in degrees | ACOS(1) = 0 |
ACOSH | Returns the hyperbolic arc-cosine in degrees | ACOSH(1.5) = 0.96 |
ACOSR | Returns the arc-cosine in radians | ACOSR(0.5) = 1.04 |
ACOT | Returns the arc-co-tangent in degrees | ACOT(1) = 45 |
ACOTH | Returns the hyperbolic arc-co-tangent in degrees | ACOTH(1.003) = 3.141 |
ACOTR | Returns the arc-co-tangent in radians | ACOTR(1) = 0.785 |
ASIN | Returns the arc-sine in degrees | ASIN(1) = 90 |
ASINH | Returns the hyperbolic arc-sine in degrees | ASINH(6.76) = 2.61 |
ASINR | Returns the arc-sine in radians | ASINR(1) = 1.57 |
ATAN | Returns the arc-tangent in degrees | ATAN(1) = 45 |
ATAN2 | Returns the angle of arc-tangent2 in degrees | ATAN2(1, 0) = 90 |
ATAN2R | Returns the angle of arc-tangent2 in radians | ATAN2R(1, 0) = 1.57 |
ATANH | Returns the hyperbolic arc-tangent in degrees | ATANH(0.5) = 0.54 |
ATANR | Returns the arc-tangent in radians | ATANR(1) = 0.78 |
COS | Returns the cosine in degrees | COS(180) = -1 |
COSH | Returns the hyperbolic cosine in degrees | COSH(PI) = 11.591 |
COSR | Returns the cosine in radians | COSR(PI) = -1 |
COT | Returns the co-tangent in degrees | COT(45) = 1 |
COTH | Returns the hyperbolic co-tangent in degrees | COTH(PI) = 1.003 |
COTR | Returns the co-tangent in radians | COTR(0.785) = 1 |
CSC | Returns the co-secant in degrees | CSC(270) = -1 |
CSCH | Returns the hyperbolic co-secant in degrees | CSCH(3*PI/2) = 0.017 |
CSCR | Returns the co-secant in radians | CSCR(3*PI/2) = -1 |
DEG | Converts an angle from radians to degrees | DEG(0.785) = 45 |
RAD | Converts an angle from degrees to radians | RAD(45) = 0.785 |
SIN | Returns the sine in degrees | SIN(150) = 0.5 |
SINH | Returns the hyperbolic sine in degrees | SINH(2.61) = 6.762 |
SINR | Returns the sine in radians | SINR(2.61) = 0.5 |
SEC | Returns the secant in degrees | SEC(120) = -2 |
SECH | Returns the hyperbolic secant in degrees | SECH(2.09) = 0.243 |
SECR | Returns the secant in radians | SECR(2.09) = -2 |
TAN | Returns the tangent in degrees | TAN(360) = 0 |
TANH | Returns the hyperbolic tangent in degrees | TANH(2*PI) = 1 |
TANR | Returns the tangent in radians | TANR(2*PI) = 0 |
STR_CONTAINS | Returns true if string contains substring or false if not (case sensitive) | STR_CONTAINS("Hoy es", "Hoy") = true |
STR_LOWER | Converts all the characters of a string to upper case | STR_LOWER("HOY ES") = hoy es |
STR_UPPER | Converts all the characters of a string to upper case | STR_UPPER("hoy es") = HOY ES |
Expression Configuration
The expression evaluation can be configured to enable and disable specific features.
Feature | Description | Defaults |
---|---|---|
allowOverwriteConstants | Allows variables to have the name of a constant | True |
arraysAllowed | Allows array index functions | True |
dataAccessorSupplier | The Data Accessor is responsible for storing and retrieving variable values. You can define your own data access interface, by defining a class that implements the DataAccessorInterface | MapBasedDataAccessor |
decimalPlacesRounding | Specifies the amount of decimal places to round to in each operation or function | Disabled |
defaultConstants | Specifies the default constants that can be used in every expression | ExpressionConfiguration.StandardConstants |
functionDictionary | The function dictionary is used to look up the functions that are used in an expression. You can define your own function directory, by defining a class that implements the FunctionDictionaryIfc | MapBasedFunctionDictionary |
implicitMultiplicationAllowed | Allows for automatic multiplication without operators | True |
mathContext | Specifies the precision and rounding method | Precision: 68, Mode: HALF-EVEN |
operatorDictionary | The operator dictionary is used to look up the functions that are used in an expression. You can define your own operator directory, by defining a class that implements the OperatorDictionaryIfc | MapBasedOperatorDictionary |
powerOfPrecedence | Allows changes to the operation precedence | Lower precedence |
stripTrailingZeros | Allows the trailing decimal zeros in a number result to be stripped | True |
structuresAllowed | Specifies if the structure separator (‘.’) operator is allowed | True |
Custom Functions
Personalized functions can be added with the expression configuration. For this process a new class that extends AbstractFunction. In this class @FunctionParameter
tags must be added to class. These will signify the parameters needed to use the function. Lastly, an override to the evaluate method is needed to implement the custom function's logic. This functions can be called recursively.
This class is an example of a basic custom function which adds 5 to the parameter provided.
@FunctionParameter(name = "value")
public class AddFiveFunction extends AbstractFunction {
@Override
public EvaluationValue evaluate(Expression expression, Token functionToken, EvaluationValue... parameterValues) throws EvaluationException {
EvaluationValue value = parameterValues[0];
return new EvaluationValue(value.getNumberValue().doubleValue()+5.0);
}
}
The function is then added to the evaluated expression via an ExpressionConfiguration
object at evaluation time.
ExpressionConfiguration configuration = ExpressionConfiguration.
defaultConfiguration().withAdditionalFunctions(
Map.entry("ADDFIVE", new AddFiveFunction()));
var input = "ADDFIVE(5)";
var expression = new Expression(input, configuration);
var rawResult = expression.evaluate();
int finalResult = rawResult.getNumberValue().intValue();
Custom Operators
Much like functions, custom operators can be added. To do this, a new class that extends AbstractOperator
is needed. A tag must be added to specify if the operator must be used as prefix, postfix or infix. The tag also specifies the precedence using a value for comparison. The value of the other operators can be seen in the OperatorIfc
for reference. If no value is specified, the operator will have the highest precedence.
This class is an example of a basic custom function which adds 5 to the parameter provided.
@PrefixOperator(precedence = 1)
public class TimesThreeOperator extends AbstractOperator {
@Override
public EvaluationValue evaluate(Expression expression, Token operatorToken, EvaluationValue... operands) throws EvaluationException {
return new EvaluationValue(operands[0].getNumberValue().intValue()*3);
}
}
The operator is then added to the evaluated expression via an ExpressionConfiguration
object at evaluation time.
ExpressionConfiguration configuration = ExpressionConfiguration.
defaultConfiguration().withAdditionalOperators(
Map.entry("@", new TimesThreeOperator()));
var input = "@3";
Expression expression = new Expression(input, configuration);
return expression.evaluate();
EvalEx Utils
The EvalExUtils
class contains methods made to facilitate the processes that may use the Expression Language evaluator.
Array Converters
When an array is evaluated all of its contents are converted to an EvaluationValue
object. This renders the resulting array useless if not processed. To simplify this, 4 array converters were included to convert an EvaluationValue
array to an Integer, String, Boolean or plain Object array.
Added Functions
System Function
SystemFunction
gets a System property or Environment Variable value given it's key. The System property has precedence. If no System property or Environment variable is found, an EvaluationException
is thrown with the message "No such system parameter or environment variable". It is used with the SYSTEM
tag. E.g. SYSTEM(java.runtime.version) = 17.0.6+10 (result may vary).
Encrypt and Decrypt Functions
EncryptFunction
encrypts the given content using the encryption method in CryptoUtils
. It is used with the ENCRYPT
tag. E.g. ENCRYPT("Test") = (Encrypted string for "Test"). DecryptFunction
decrypts an encrypted content using the decryption method in CryptoUtils
. It is used with the DECRYPT
tag. E.g. DECRYPT(Encrypted string for "Test") = Test.
Regex Match Function
RegexMatchFunction
returns a boolean specifying if a string matches a given pattern. It is used with the REGEX_MATCH
tag. E.g. REGEX_MATCH("This is a Test", ".t") = true.
Ends With and Starts with Functions
EndsWithFunction
verifies with a boolean if a string ends with a given substring. It is used with the STR_ENDSWITH
tag. E.g. STR_ENDSWITH("This is a test","test") = true. StartsWithFunction
verifies with a boolean if a string starts with a given substring. These functions are case-sensitive. It is used with the STR_STARTSWITH
tag. E.g. STR_STARTSWITH("This is a test","This") = true.
Is Empty and Is Blank Functions
IsBlankFuction
verifies with a boolean if a string is blank. It is used with the tag STR_ISBLANK
tag. E.g. STR_ISBLANK(" ") = true. IsEmptyFunction
verifies with a boolean if a string is empty. It is used with the tag STR_ISEMPTY
tag. E.g. STR_ISEMPTY("") = true.
Split Function
SplitFunction
splits a given string by a given token. The resulting array contains EvaluationValue
objects, therefore the substrings must be converted back to strings. It is used with the STR_SPLIT
tag. E.g. STR_SPLIT("This is a Test", "") = {"This", "is", "a", "test"}.
Concat Function
ConcatFunction
concatenates any given set of strings. It is used with the STR_CONCAT
tag. E.g. STR_CONCAT("This", "is", "a", "test") = "This is a Test".
Array Join Function
ArrayJoinFunction
concatenates two given arrays of the same type into one array. It is used with the ARR_JOIN
tag. The resulting array will be an Object[], therefore it must be processed with the arrayFactoryProcessor
. E.g. JOIN(list1, list2) = (1, 2, 3, 4, 5, 6) with list1 = (1, 2, 3) and list2 = (4, 5, 6) being variables.
Array Contains Function
ArrayContainsFunction
verifies if an array contains a given value. It is used with the ARR_CONTAINS
tag. E.g. ARR_CONTAINS(array, 1) = true, with array = (1, 2, 3) being a variable.
Get By Index Function
GetByIndexFunction
provides the value in the given index. It is used with the ARR_GET
tag. E.g. ARR_GET(array, 0) = "This", with array = ("This", "is", "a", "test") being a variable.
Array Is Empty Function
ArrayIsEmptyFunction
provides a boolean specifying if an array is Empty. It is used with the ARR_ISEMPTY
tag. E.g. ARR_ISEMPTY(array) = true, with array = () being a variable.
Array Size Function
ArraySizeFunction
provides the value in the given index. It is used with the ARR_SIZE
tag. E.g. SIZE(array) = 3, with array = (1, 2, 3) being a variable.
Get By Key Function
GetByKeyFunction
returns the value assigned to the value given by parameter. Because of the evaluation process, the key should be passed as a string even when the actual key is a number. It is used with the MAP_GET
tag. E.g. MAP_GET(map, "1") = "Test" where <Integer, String> map = (1, "Test") ia a variable
Map Contains Key Function
MapContainsKeyFunction
provides a boolean specifying if a map contains a given key. Because of the evaluation process, the key should be passed as a string even when the actual key is a number. It is used with the MAP_CONTAINS
tag. E.g. MAP_CONTAINS(map, "1") = true where <Integer, String> map = (1, "Test") ia a variable.
Map Is Empty Function
MapIsEmptyFunction
verifies with a boolean if a map is empty. It is used with the tag MAP_ISEMPTY
tag. E.g. MAP_ISEMPTY(map) = true where map = () ia a variable.
Map Size Function
MapIsEmptyFunction
returns the size of a map. It is used with the tag MAP_SIZE
tag. E.g. MAP_SIZE(map) = 1 where <Integer, String> map = (1, "Test") ia a variable.`
File Read Function
FileReadFunction
reads a file using the find utility in FileStorageService
. It is used with the FILE_FIND
tag.
HTTP Base Client
The HTTP Base Client is an abstraction of the OkHttpClient. It provides features such as retry handlers (see Retries) and automated paginated requests.
The minimum initialization for the client defines the configuration of the HTTP connections and other shared properties to be used by the endpoints:
public class MyClient extends HttpBaseClient {
protected MyClient(
@NonNull OkHttpClient client,
@NonNull ObjectMapper objectMapper,
@NonNull BackoffPolicyProperties backoffPolicy,
MeterRegistry meterRegistry,
@NonNull ScrollProperties scroll)
{
super(client, objectMapper, backoffPolicy, scroll);
}
@Override
public boolean ping() {
return true;
}
public static Builder builder() {
return new Builder();
}
public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
@Override
public MyClient build() {
return new MyClient(
newClientBuilder().build(),
getObjectMapper(),
getBackoffPolicy(),
getScroll());
}
}
}
The builder can be also initialized with a HttpBaseClientProperties
instance:
public class MyClient extends HttpBaseClient {
...
public static Builder builder(HttpBaseClientProperties clientProperties) {
return new Builder(clientProperties);
}
@NoArgsConstructor
public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
protected Builder(HttpBaseClientProperties clientProperties) {
super(clientProperties);
}
...
}
}
If some configurations are not desired on the implemented client, they can be disabled in the builder:
public class MyClient extends HttpBaseClient {
...
@HttpBaseClientBuilder.DisabledConfig({
HttpBaseClientBuilder.Config.COMPRESS_REQUESTS,
HttpBaseClientBuilder.Config.FOLLOW_REDIRECTS,
HttpBaseClientBuilder.Config.SCROLL
})
public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
...
}
}
any attempt to set a disabled configuration will throw an UnsupportedOperationException
.
By default, the retry of the executeWithBackoff(...)
method will happen if the HTTP response code is:
- 408 - Request Time Out
- 429 - Too Many Request
- 500 - Internal Server Error
- 504 - Gateway Timeout
The condition can be changed by sending a different predicate to the method:
client.executeWithBackoff(requestDetails, response -> false);
In this case, the request will not be retried because of a status code (although it might be retried because of an exception during the execution).
HTTP Round Robin Client
The HTTP Round Robin Client is a specialization of the HttpBaseClient
backed up by a round-robin collection. Provides the corresponding classes as described in the previous section: HttpRoundRobinClient
, HttpRoundRobinClientBuilder
, and HttpRoundRobinClientProperties
.
HTTP Client Request
The request object is a simple POJO with the properties needed to execute the HTTP call:
@RequiredArgsConstructor
public class MyRequest implements HttpClientRequest {
private final String id;
}
For cases when no data is needed in the request, an empty implementation is avaliable in
HttpRoundRobinClient#EMPTY_REQUEST
HTTP Client Pageable Request
The HttpClientPageableRequest
interface is an extension of HttpClientRequest
and defines default methods to handle pagination parameters.
HTTP Client Response
Single Response
Expected when the response is a single payload.
public class MySinglePayloadResponse extends AbstractSingleResponse<MyRequest> {
public MySinglePayloadResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse);
}
@Override
protected void handleSuccess(Response httpResponse) {
// Do something with the response
}
}
With this definition, the endpoint can be exposed in the client:
public class MyClient extends HttpRoundRobinClient {
...
public MySinglePayloadResponse get(MyRequest clientRequest) {
var url = serverHosts.next()
.newBuilder()
.addPathSegment("content")
.addPathSegment(request.getId())
.build();
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(url).get().build(),
clientRequest);
return new MySinglePayloadResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails)
);
}
...
}
Both handleSuccess(Response)
and handleFailure(Response)
have a default implementation that can be overridden. A response is considered a success if its HTTP status code is in the 200 to 299 range and its body is not null
(not to be confused with an empty body). This check can be changed in the constructor:
public class MySinglePayloadResponse extends AbstractSingleResponse<MyRequest> {
public MySinglePayloadResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse, r -> r.code() == 400);
}
}
If the response was a failure, but it had a payload, it can be retrieved with AbstractSingleResponse#getErrorMessage()
, or AbstractSingleResponse#getErrorMessage(Class)
(to convert the payload into the given class type).
Iterable Response
Expected when the response is a full collection of iterable elements with no pagination. It is assumed that the payload of the response is a JSON array (if that's not the case, the handleSuccess(Response httpResponse)
method must be overridden).
public class MyIterableResponse extends AbstractIterableResponse<MyRequest, String> {
public MyIterableResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse);
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
With this definition, the endpoint can be exposed in the client:
public class MyClient extends HttpRoundRobinClient {
...
public MyIterableResponse list(MyRequest clientRequest) {
var url = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.build();
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(url).get().build(),
clientRequest);
return new MyIterableResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails)
);
}
...
}
Each element in the response can then be retrieved by using the Iterable<>
interface.
The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.
Paginated Response
Token Page
Expected when the response is a consecutive list of pages obtained with a token.
Token Page Response
The response corresponds to each page from the pagination requests. Since it is not possible to automatically determine where is the token for the next page, the AbstractTokenPageResponse#handleSuccess(Response)
method must be implemented.
public class MyTokenPageResponse extends AbstractTokenPageResponse<MyRequest, String> {
public MyTokenPageResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse,
String currentToken
) {
super(objectMapper, requestDetails, httpResponse, currentToken);
}
@Override
protected void handleSuccess(Response response) {
try {
var json = objectMapper.readTree(Objects.requireNonNull(response.body()).byteStream());
this.nextPageReference = json.get("token").asText();
if (json.has("content")) {
this.elements = new JsonLazyIterator<>(json.get("content"), this::parseElement);
} else {
this.elements = Collections.emptyIterator();
}
} catch (Exception ex) {
throw new DataException(ex.getMessage(), ex);
}
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.
Token Page Collection
The response corresponds to a collection of all the page responses that iterates through the data.
public class MyTokenPageCollectionResponse extends AbstractTokenCollectionResponse<MyRequest, String, MyTokenPageResponse> {
public MyTokenPageCollectionResponse(Function<String, MyTokenPageResponse> pageFunction) {
super(pageFunction);
}
}
With this definition, the endpoint can be exposed in the client:
public class MyClient extends HttpRoundRobinClient {
...
public MyTokenPageCollectionResponse paginatedByToken(MyRequest clientRequest) {
return new MyTokenPageCollectionResponse(
token -> {
var urlBuilder = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.addQueryParameter("limit", String.valueOf(scroll.getSize()));
if (token != null) {
urlBuilder.addQueryParameter("token", token);
}
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(urlBuilder.build()).get().build(),
clientRequest);
return new MyTokenPageResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails),
token
);
}
);
}
...
}
Note that in case of a null
token, the function must return the first page.
Each element in the response can then be retrieved by using the Iterable<>
interface. The pagination will be handled behind the scenes.
Offset Page
Expected when the response is a consecutive list of pages obtained with a page number.
Offset Page Response
The response corresponds to each page from the pagination requests. It is assumed that the payload of the response is a JSON array (if that's not the case, the handleSuccess(Response httpResponse)
method must be overridden).
It is also assumed that the AbstractOffsetPageResponse#getNextPageReference()
is always 1 after the current page. This method can be overridden for custom behaviors.
public class MyOffsetPageResponse extends AbstractOffsetPageResponse<MyRequest, String> {
public MyOffsetPageResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse,
Long currentOffset
) {
super(objectMapper, requestDetails, httpResponse, currentOffset);
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.
Pageable Page Response
The response is a specialization of the Offset Page Response, and it is based on a default page structure:
{
"totalPages": 0,
"totalElements": 0,
"size": 0,
"content": [
{}
],
"number": 0,
"first": true,
"last": true,
"sort": {
"sorted": true,
"unsorted": true,
"empty": true
},
"numberOfElements": 0,
"pageable": {
"page": 0,
"size": 0,
"sort": [
"string"
]
},
"empty": true
}
Offset Page Collection
The response corresponds to a collection of all the page responses that iterates through the data.
public class MyOffsetPageCollectionResponse extends AbstractOffsetCollectionResponse<MyRequest, String, MyOffsetPageResponse> {
public MyOffsetPageCollectionResponse(Function<String, MyOffsetPageResponse> pageFunction) {
super(pageFunction);
}
}
With this definition, the endpoint can be exposed in the client:
public class MyClient extends HttpRoundRobinClient {
...
public MyOffsetPageCollectionResponse paginatedByOffset(MyRequest clientRequest) {
return new MyOffsetPageCollectionResponse(
offset -> {
var urlBuilder = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.addQueryParameter("limit", String.valueOf(scroll.getSize()));
if (token != null) {
urlBuilder.addQueryParameter("page", String.valueOf(offset));
}
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(urlBuilder.build()).get().build(),
clientRequest);
return new MyOffsetPageResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails),
offset
);
}
);
}
...
}
Note that in case of a null
token, the function must return the first page.
Each element in the response can then be retrieved by using the Iterable<>
interface. The pagination will be handled behind the scenes.
JSON
JSON handling is done with Jackson Project, and the default configuration for the mapper can be instantiated with com.pureinsights.pdp.core.config.CoreSerializer.newInstance()
.
Default Object Mapper
Registered Modules
JavaTimeModule
to handle the JSR-310: Date and Time API
Registered Deserializers
com.pureinsights.pdp.core.time.CoreDurationDeserializer
as an extension of the defaultDurationDeserializer
to allow values in human-readable format (i.e. "1s", "12m", "5h"...). If no suffix is provided, milliseconds are assumed. The supported suffixes are:d
- daysh
- hoursm
- minutess
- secondsms
- millisecondsns
- nanoseconds
Enabled Features
MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS
MapperFeature.ACCEPT_CASE_INSENSITIVE_VALUES
DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY
Disabled Features
SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS
SerializationFeature.WRITE_DATES_AS_TIMESTAMPS
SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE
DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS
Logging Handler
The logging handler simplifies the process of log analysis by using a JSON output with support for custom POJOs.
Logback Configuration
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<provider class="com.pureinsights.pdp.core.logging.CoreArgumentsProvider"/>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Custom POJO
@Getter
public class MyPOJO {
private final UUID id = UUID.randomUUID();
}
public class MyPOJOLoggingHandler implements PojoLoggingHandler<MyPOJO> {
private static final String PARENT_LABEL = "myPOJO";
private static final String ID_LABEL = "id";
@Override
public Class<MyPOJO> getSupportedType() {
return MyPOJO.class;
}
@Override
public void write(JsonGenerator generator, MyPOJO pojo) throws IOException {
generator.writeFieldName(PARENT_LABEL);
generator.writeStartObject();
generator.writeObjectField(ID_LABEL, pojo.getId());
generator.writeEndObject();
}
}
In order to invoke the handler, the class type can call the logging framework:
log.info("Message", new MyPOJO());
log.info("Message {}", new MyPOJO());
log.info("Message {}", "with placeholder", new MyPOJO());
{"@timestamp":"2021-12-08T15:07:11.379Z","@version":1,"message":"Message","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"707ea126-d94b-47b6-9010-72a3acc8a786"}}
{"@timestamp":"2021-12-08T15:07:11.380Z","@version":1,"message":"Message MyPOJO$373141eb","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"398bb1df-29b3-4738-afc6-a7b9e892ce19"}}
{"@timestamp":"2021-12-08T15:07:11.381Z","@version":1,"message":"Message with placeholder","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"00ba3228-f434-4ed0-b45e-62ec9163aac4"}}
Note that the custom POJO is always displayed as expected. The example also demonstrates that the POJO must not have a corresponding placeholder in the message as it will be automatically resolved by the provider. Exceptions must be the in the last position of the input:
...
} catch (Exception ex) {
log.error(ex.getMessage(), new MyPOJO(), ex);
}
If the logging handler is part of the PDP Core Common Libraries project, it must be registered in the static section of the com.pureinsights.pdp.core.logging.CoreArgumentsProvider
class.
Any other logging handler under an application context, must be declared as a bean (i.e. @Singleton
).
If none of the above is applicable, the handler must be registered using the CoreArgumentsProvider#registerHandler(PojoLoggingHandler)
method.
Metrics
The metrics of the application are collected and exposed by Micrometer. They are enabled by default in the application.yml
and exposed through the GET /metrics
and GET /metrics/[name]
endpoints:
micronaut:
metrics:
enabled: true
{
"names": [
"http.server.requests",
"jvm.buffer.count",
"jvm.buffer.memory.used",
...,
"pdp.metric.count"
]
}
When available, the metrics can be filtered with the use of tags:
GET /metrics/pdp.metric.count?tag=source:component&tag=type:value1
By default, all metrics include the
component
tag with the name of the application and thehostname
tag with the domain name of the IP address where the service is running. Additionally, anenv
tag with the name of the environment will be included if the value is set on theENV_NAME
environment variable
Annotation-based metrics
Any method in any bean can be measured with:
@Timed
@Counted
@Singleton
public class MyService {
@Timed(value = "pdp.myservice.execute", histogram = true)
public void execute(UUID id) {
...
}
@Counted("pdp.myservice.validate")
public void validate(UUID id) {
...
}
}
The name of the metric must be in lowercase, using dot notation
Custom metrics
Instead of using annotations, the metric registry can be injected and manually handled in any bean, with any type of meter:
@Singleton
public class MyCustomService {
private Counter counter;
@PostConstruct
void init(MeterRegistry meterRegistry) {
counter = Counter.builder("pdp.mycustomservice.count")
.tag("key", "value")
.register(meterRegistry);
}
public void count() {
counter.increment();
}
}
Retries
Retries are managed by resilience4j, see more. There is a backoff policy for retries: pdp.core-common.config.BackoffPolicyProperties
, example of implementation:
final var random = new Random();
final int upperbound = 25;
final var backoffPolicy = new BackoffPolicyPropertiesImpl();
// Retry configuration
Predicate<Integer> isOdd = number -> number % 2 == 0;
var retryConfig = backoffPolicy.<Integer>getRetryConfigBuilder()
.retryOnResult(isOdd)
.failAfterMaxAttempts(true)
.build();
var retry = Retry.of("Get odd number", retryConfig);
return Retry.decorateFunction(retry, f -> random.nextInt(upperbound)).apply(isOdd);
Scheduler
Scheduled tasks are handled by Quartz. Any application can inject the org.quartz.Scheduler
bean to register/unregister its own cron jobs.
Any execution of a org.quartz.Job
implementation will automatically perform the dependency injection of its visible fields without the need of declaring the class as a singleton.
Message Queue Provider
To handle communication between the different components, PDP uses a message queue. The following is a quick overview of the available configuration fields, examples on how to use the Message Queue classes and the current implementations available with configuration examples.
Configuration
Basic Configuration
Property | Type | Default | Description |
---|---|---|---|
messageQueue.provider | String | The name of the MessageQueue provider to use. (e.g. rabbitmq to load the RabbitMQ provider). |
Using the MessageQueueProvider
The MessageQueueProvider instance can be injected as follows:
public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
Messages
Message Properties
Property | Type | Default | Description |
---|---|---|---|
body | byte[] | The body of the message. | |
queue | String | The queue this message was sent to or delivered from. | |
type | Message.MessageType | The type of the message. Can be DIRECT or BROADCAST. See Message Types. | |
properties | Map<String,String> | Map of message properties. |
Message Types
Direct Messages
Direct messages are sent to a given queue and are meant to be consumed by a single consumer. The message is persisted in the queue until a consumer is available.
Broadcast Messages
Broadcast messages are sent to a given queue and are meant to be consumed by all consumers available at the moment of sending. The message is not persisted if no consumers are available.
Creating a new Message
The Message class provides a builder to simplify the message creation process:
Message newMessage = Message.builder()
.body("My message")
.type(Message.MessageType.DIRECT)
.queue("queue")
.property("key", "value")
.build();
Some considerations with the Message builder:
- There are multiple ways to set the message body. If more than one is set, only the last one will be used.
- Every time the
property
method is called, the given property is appended to the map. - Calling
properties
method will overwrite any previously set properties.
Consumers
Configuration
Property | Type | Default | Description |
---|---|---|---|
consumer.{name}.maxMessages | Integer | 5 | Maximum number of messages to consume in parallel. |
consumer.{name}.maxMessageRequeues | Integer | 5 | Maximum number of times a message should be sent back to the queue for retry. |
Example
messageQueue:
consumer:
directConsumer:
maxMessages: 4
maxMessageRequeues: 6
This configuration can be loaded as follows
protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
try {
var maxRequeues = consumerProperties.getMaxMessageRequeues();
...
}
}
Registering Consumers
To register a consumer do the following:
public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
try {
mqProvider.registerConsumer(queue, consumerProperties, Message.MessageType.DIRECT, this::consumeMessage);
...
}
}
The registerConsumer
method takes the following parameters:
Parameter | Type | Description |
---|---|---|
queue | String | Name of the queue to register the Consumer to. |
consumerProperties | ConsumerProperties | A ConsumerProperties instance for the consumer. |
messageType | Message.MessageType | The type of message this consumer is supposed to listen to. Can be Message.MessageType.DIRECT or Message.MessageType.BROADCAST. See Message Types. |
onMessageDelivery | Predicate<Message> | The function to execute when a message is received. Returns true if the message is successfully consumed, false otherwise. |
If the consumer should execute an action when the maximum number of requeues has been exceeded use the following:
public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
try {
mqProvider.registerConsumer(queue, consumerProperties, Message.MessageType.DIRECT, this::consumeMessage, this::requeueConsumeMessage);
...
}
}
It adds the following parameter to the previous method:
Parameter | Type | Description |
---|---|---|
onMessageRequeueConsumer | Consumer<String> | The function to execute when the maximum requeue has been exceeded. The function's parameter is a String containing the failed message's body. |
Producers
Configuration
Property | Type | Default | Description |
---|---|---|---|
producer.threads | Integer | 5 | Number of threads available for producers to send asyncMessages. |
producer.sendMessageTimeout | Duration | 30s | Maximum time to wait to get a confirmation that a message has been successfully sent. |
producer.retry.retries | Integer | 5 | Maximum number of times to try to send a message. |
producer.retry.delay | Duration | 1s | Time to wait before retrying. The time is multiplied by the number of retries to backoff between executions. |
Example
messageQueue:
producer:
sendMessageTimeout: 10000
threads: 10
retry:
retries: 10
delay: 10000
This configuration can be loaded by injecting the ProducerProperties instance:
public class MyClass {
@Inject
ProducerProperties producerProperties;
...
Getting a Producer instance
To get a new producer instance do the following:
public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
protected void myMethod() {
try {
var myProducer = mqProvider.getProducer();
...
}
}
Sending a message
Async Messages
producer.asyncSend(newMessage,
(message) -> messageSuccessHandler(message),
(message, exception) -> messageFailureHandler(message, exception));
The asyncSend
method takes the following parameters:
Parameter | Type | Description |
---|---|---|
message | Message | The Message to send. |
successHandler | Consumer<Message> | A consumer function to execute when the message is succesfully sent. |
failureHandler | BiConsumer<Message, Exception> | A biconsumer function to execute when there is an error sending the message and the retries have been exhausted. |
Sync Messages
producer.syncSend(newMessage);
The syncSend
method takes the following parameters:
Parameter | Type | Description |
---|---|---|
message | Message | The Message to send. |
Providers
RabbitMQ
To enable the RabbitMQ provider, set the messageQueue.provider
property with rabbitmq
as value.
Configuration
Property | Type | Default | Description |
---|---|---|---|
host | String | localhost | The RabbitMQ host to connect to. |
port | Integer | 5672 | The port to connect to. |
user | String | The user in case authentication is required | |
password | String | The password for the user | |
virtualHost | String | / | The RabbitMQ virtual host to connect to. |
uri | String | The complete RabbitMQ url. For example [amqps://user:pass@localhost:5671/%2f]. Setting this property will overwrite host , port , user , password and virtualHost . | |
exchangeName | String | pdp.exchange | The name of the exchange to send the messages to. |
tls.enabled | Boolean | false | Whether TLS is enabled or not on RabbitMQ. This assumes the required certificates come from a known CA or have already been registered into the JVM cacerts keystore. |
tls.tlsVersion | String | TLSv1.2 | TLS version being used. |
tls.p12KeyPath | String | Path to the P12 key. | |
tls.p12KeyPassphrase | String | Passphrase for the P12 key. |
RabbitMQ Producer Configuration
The following are the RabbitMQ specific configuration properties for a producer.
Property | Type | Default | Description |
---|---|---|---|
minIdleChannels | Integer | 5 | The minimum number of idle channels to leave on the channel pool. |
maxIdleChannels | Integer | 10 | The maximum number of idle channels to leave on the channel pool. |
maxOpenChannels | Integer | 20 | The maximum number of open channels to have at the same time. This includes idle and in use channels. |
Configuration Example
messageQueue:
provider: rabbitmq
uri: amqps://user:pass@localhost:5671/%2f
exchangeName: testExchange
tls:
enabled: true
p12KeyPath: C:/certs/rmq.p12
p12KeyPassphrase: rmqpass
producer:
minIdleChannels: 2
maxIdleChannels: 4
maxOpenChannels: 6
sendMessageTimeout: 10000
threads: 10
retry:
retries: 10
delay: 10000
consumer:
directConsumer:
maxMessages: 4
mmaxMessageRequeues: 6
broadcastConsumer:
maxMessages: 3
maxMessageRequeues: 6
Circuit Breaker
Overview
The idea behind a circuit breaker is to wrap a protected function call in a circuit breaker object, which monitors for failures. Once the failures reach a certain threshold, the circuit is open, and further calls to the circuit return an error right away instead of executing the protected function. Some parameters then control when the circuit will close again and test whether the protected function can be executed without errors. A more in-depth explanation on the general concept can be found here
This core library provides a wrapper for the registry used in the Circuit Breaker pattern implementation offered by resilience4j. The main purpose of the wrapper is to simplify the configuration of a circuit breaker by exposing only a limited number of configuration parameters, and setting default values for the rest. These parameters values were chosen thinking of a use case where the circuit breaker prevents constant requests to an external API that has returned a 429 error, although it can be used in other scenarios. Of all the parameters found in the documentation linked above, the following are the ones exposed in the builder
of the CircuitBreakerRegistryWrapper
class, and thus are open to configuration:
The defaults listed in the table are those of the builder, which might differ from those of the library.
Parameter | Type | Default |
---|---|---|
failureRateThreshold | float | 50 |
slowCallDurationThreshold | Duration | 1 hour |
slowCallRateDurationThreshold | float | 100 |
slidingWindowSize | int | 2 |
waitDurationInOpenState | Duration | 60 seconds |
recordExceptions | List<Class<? extends Throwable>> | empty |
ignoreExceptions | List<Class<? extends Throwable>> | empty |
As for the other configuration parameters, the following values are used:
Parameter | Type | Value |
---|---|---|
permittedNumberOfCallsInHalfOpenState | int | 1 |
maxWaitDurationInHalfOpenState | Duration | 0 |
slidingWindowType | SlidingWindowType | COUNT_BASED |
automaticTransitionFromOpenToHalfOpenEnabled | boolean | true |
minimumNumberOfCalls | int | Same as slidingWindowSize |
recordFailurePredicate | throwable -> boolean | By default all exceptions are recorded as failures. |
ignoreExceptionPredicate | throwable -> boolean | By default no exception is ignored. |
Examples
Creating a default CircuitBreakerRegistryWrapper
, using it to create a circuit breaker, and then wrapping a function:
import com.pureinsights.pdp.core.circuitbreaker.CircuitBreakerRegistryWrapper;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
public class SomeClass {
// The protected function
public int doubleNumber(int param) {
return param * 2;
}
// Here's where the circuit breaker is used
public void callWithCircuitBreaker() {
// Build the wrapper
var circuitBreakerWrapper = CircuitBreakerRegistryWrapper.builder().build();
// Get a circuit breaker instance (the name provided must be unique for each wrapper)
var cb = circuitBreakerWrapper.getCircuitBreaker("test");
// Use the instance to decorate the function
var decorated = CircuitBreaker.decorateFunction(circuitBreaker, this::execute);
// Call the function
try {
var result = decorated.apply(5);
// The apply method will return an exception if the circuit is open
} catch (CallNotPermittedException callNotPermittedException) {
log.error("call not permitted");
}
}
}
Creating a CircuitBreakerRegistryWrapper
with all custom parameters:
import com.pureinsights.pdp.core.circuitbreaker.CircuitBreakerRegistryWrapper;
public class SomeClass {
public void foo() {
ArrayList<Class<? extends Throwable>> customRecordExceptions = new ArrayList<>(
Arrays.asList(IllegalArgumentException.class, TimeoutException.class));
ArrayList<Class<? extends Throwable>> customIgnoreExceptions = new ArrayList<>(
List.of(NullPointerException.class));
var circuitBreakerWrapper = CircuitBreakerRegistryWrapper.builder()
.failureRateThreshold(60)
.slidingWindowSize(10)
.slowCallDurationThreshold(Duration.ofMillis(120000))
.slowCallRateThreshold(80)
.waitDurationInOpenState(Duration.ofMillis(10000))
.ignoreExceptions(customIgnoreExceptions)
.recordExceptions(customRecordExceptions)
.build();
}
}
Encryption
PDP provides a utilities class called CryptoUtils that allows the user to encrypt and decrypt any given String.
import com.pureinsights.pdp.core.crypto.CryptoUtils;
public class EncryptedEntity {
String content;
public EncrytedEntity(String content) {
this.content = CryptoUtils.encrypt(content);
}
public String plainContent() {
return CryptoUtils.decrypt(content);
}
}
The encryption mechanism provided by the CryptoUtils uses a master password to generate the secret key. This master password can be configured setting the environment variable PDP_MASTER_PASSWORD
.
Warning For production environments always set your own master password. Never rely on the default one.
Secrets Service
The Secrets Service provides a way to store and retrieve encrypted information from a given repository.
Configuration
secretsService:
provider: <providerKey>
Configuration Parameters
provider
(Optional, String) Repository provider to store and retrieve secrets By default the storage
provider will be used.
Providers
Storage
Stores and retrieves secrets from the configured storage for PDP.
Configuration
secretsService:
provider: storage