Binary Data Service
The Binary Data Service is an abstraction that hides the complexity of storing and retrieving binary data using configurable providers.
Uses the concept of sessions to buckets in order to give a logical separation between different scopes. For example, "config" could be a bucket to store all data related to the configurations scope, and it will be represented as a folder in the local file system provider.
@Inject
protected BinaryDataService binaryDataService;
protected BinaryDataServiceSession session;
@PostConstruct
void init() {
session = binaryDataService.newSession("config");
}
public void storeConfig(String key, InputStream inputStream) {
session.store(key, inputStream);
}
public Optional<InputStream> findConfig(String key) {
return session.find(key);
}
Providers
Local File System
The default provider. Uses the local file system, or any mounted drive as the storage:
storage:
binary:
provider: local
dataPath: other
Configuration parameters:
storage.binary.provider
(Optional, String) If given, must be local
.
storage.binary.dataPath
(Optional, String) The path where the data will be stored. Default is ${tmp}/binary
,
where ${tmp}
is the default temporary folder of the OS where the application runs.
SFTP
Uses a remote SFTP server as storage:
storage:
binary:
provider: sftp
dataPath: other
server: sftp://user:pass@localhost:22
connection:
keepAliveInterval: 1m
connectTimeout: 30s
readTimeout: 30s
Keep in mind that SFTP is known to be slow. Use it only as last resource
Configuration parameters:
storage.binary.provider
(Required, String) Must be sftp
.
storage.binary.dataPath
(Optional, String) The path where the data will be stored. Default is /binary
.
storage.binary.server
(Optional, String) The URI to the SFTP server. Default is sftp://pdp:pdp@localhost:22
.
storage.binary.connection.keepAliveInterval
(Optional, Duration) The interval to send a keep alive signal to the server. Default is 30s
.
storage.binary.connection.connectTimeout
(Optional, Duration) The timeout for the connection to the server. Default is 5s
.
storage.binary.connection.readTimeout
(Optional, Duration) The timeout for the reading from the server. Default is 3s
.
Cache Manager
The local cache is an extension to the one provided by Micronaut and implemented with Caffeine.
It simplifies the process of caching entities with the use of annotations and configurations in the application.yml
Configuration
storage:
cache:
local:
myEntity:
initialCapacity: 10
maximumSize: 50
maximumWeight: 100
expireAfterWrite: 1h
expireAfterAccess: 5m
recordStats: true
testMode: false
For a cache named myEntity
with the following properties:
| Property | Type | Default | Description | | ----------------- | :------: | :-----: | ----------- | | initialCapacity | Integer | 16 | The minimum size of the cache | | maximumSize | Long | | The maximum size of the cache. Can not be combined with a weigher | | maximumWeight | Long | | The maximum weight to be allowed for an element in the cache (see Weigher section) | | expireAfterWrite | Duration | | The time to wait to expire an element after its creation | | expireAfterAccess | Duration | 5m | The time to wait to expire an element after the last time it was accessed | | recordStats | boolean | true | To record statistics about hit rates and evictions (see Cache Statistics section) | | testMode | boolean | false | To execute all cache operations in a single thread |
Each cache must have a unique name, which will be automatically normalized to kebab-case (i.e.
myEntity
becomesmy-entity
)
A default configuration for a cache can be defined as a bean:
@Factory
@Requires(missingProperty = LocalCacheProperties.PREFIX + "." + MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheConfig {
public static final String CACHE_NAME = "my-entity";
@Bean
@Named(CACHE_NAME)
LocalCacheProperties cacheProperties(ApplicationConfiguration applicationConfiguration) {
return LocalCacheProperties.builder()
.cacheName(CACHE_NAME)
.applicationConfiguration(applicationConfiguration)
.build();
}
}
Weigher
A weigher determines if an element becomes too heavy to be in the cache. If for a cache's maximumWeight
there is
not a corresponding named weigher, any other weigher will be selected. If there is no registered weigher, a default
weigher where every element has a weight of 1 will be used:
import com.github.benmanes.caffeine.cache.*;
@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheWeigher implements Weigher<UUID, MyEntity> {
@Override
public @NonNegative int weigh(@NonNull UUID key, @NonNull MyEntity value) {
return 0;
}
}
Removal Listener
A listener that triggers every time an element is evicted:
import com.github.benmanes.caffeine.cache.*;
@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheRemovalListener implements RemovalListener<UUID, MyEntity> {
@Override
public void onRemoval(@Nullable UUID key, @Nullable MyEntity value, @NonNull RemovalCause cause) {
// Do something with the event
}
}
Annotation-based caching
Any class (POJOs, Connections, Factories...) can be stored in the cache. For example, instances of the following entity type:
@Data
public class MyEntity implements CoreEntity<UUID> {
private UUID id;
private String name;
...
}
can be cached in any managed bean with the use of io.micronaut.cache.annotation
annotations:
@Cacheable
@CachePut
@CacheInvalidate
@Singleton
@CacheConfig(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityService {
@Inject
protected MyEntityRepository myEntityRepository;
@Cacheable(keyGenerator = CoreEntityKeyGenerator.class)
public List<MyEntity> getAll() {
return myEntityRepository.getAll();
}
@Cacheable
public MyEntity getOne(UUID id) {
return myEntityRepository.getOne(id);
}
@CachePut(keyGenerator = CoreEntityKeyGenerator.class)
public void store(MyEntity myEntity) {
myEntityRepository.store(myEntity);
}
@CacheInvalidate
public MyEntity delete(UUID id) {
return myEntityRepository.delete(id);
}
}
The key for the cacheable object must implement
equals()
andhashCode()
Note that for the getAll()
and store(MyEntity)
methods, a custom key generator needs to be specified.
This way the cache will calculate the appropriate key
for each entity. If no generator is defined,
the DefaultCacheKeyGenerator
is used.
The
CoreEntityKeyGenerator
can be used with any entity that implementsCoreEntity<T>
Multiple caches can be configured in the same @CacheConfig
, in which case, the name of the used
cache must be specified. Likewise, the key for the cached value can be a composite of multiple
objects (internally wrapped as a ParametersKey
and generated by a DefaultCacheKeyGenerator
):
@Singleton
@CacheConfig({ "cacheA", "cacheB" })
public class MyMultiCacheService {
@Cacheable("cacheA")
public MyEntity getOneA(UUID id) {
...
}
@Cacheable("cacheB")
public MyEntity getOneB(UUID id, UUID parentId) {
...
}
}
Cache Statistics
If the cache statistics are enabled, they will be published as part of the application metrics:
cache.eviction.weight
- the sum of weights of evicted entriescache.evictions
- the count of cache evictionscache.size
- the estimated number of entries in the cachecache.gets
- the number of times a cache-annotated method has returned an item (regardless if it was cached or not). This metric can be refined with the use of tags:result:hit
- the number of times cache lookup methods have returned a cached valueresult:miss
- the number of times cache lookup methods have returned an uncached value
If the application has multiple caches, the metrics can be filtered with the
cache:my-entity
tag
Collections
Flushable Collection
A data structure that asynchronously flushes its content any time a preconfigured criteria is met. It is backed up by an ArrayList<T>
and it is guaranteed to be thread-safe.
Configuration
Basic Properties
| Property | Type | Default | Description | | ------------ | :------: | :-----: | ----------- | | maxCount | Integer | | The maximum number of elements before flushing. Triggers a Count flush | | maxDataSize | Long | | The maximum size of the collection elements before flushing. Triggers a Data Size flush | | flushAfter | Duration | | The duration before flushing. Triggers a Scheduled flush | | threads | Integer | 5 | The number of threads used to execute the flush event | | flushTimeout | Duration | 10m | The timeout for the flush event |
Properties Template
For a collection of type my-collection
, a set of default properties can be defined as a bean or in the application.yml
:
collections:
flushable:
myCollection:
maxCount: 10
maxDataSize: 1mb
flushAfter: 5m
threads: 10
flushTimeout: 10m
The same configuration can be defined as:
@Factory
@Requires(missingProperty = FlushableCollectionProperties.PREFIX + "." + MyFlushableCollectionConfig.TYPE)
public class MyFlushableCollectionConfig {
public static final String TYPE = "my-collection";
@Bean
@Named(TYPE)
FlushableCollectionProperties collectionProperties() {
return FlushableCollectionProperties.builder()
.type(TYPE)
.maxCount(10)
.maxDataSize(DataSize.ofMegabytes(1).asBytes())
.flushAfter(Duration.ofMinutes(5))
.threads(10)
.flushTimeout(Duration.ofMinutes(10))
.build();
}
}
Each collection definition must have a unique name, which will be automatically normalized to kebab-case (i.e.
myCollection
becomesmy-collection
)
Action Handlers
Flush Handler
Consumer to be called when the flush event is triggered. For instance, a flush handler for a collection of integer elements can be defined as:
public class MyCollectionFlushHandler implements Consumer<FlushableCollection.Batch<Integer>> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch) {
// Do something with the batch
}
}
By default, does nothing: builder.flushHandler(batch -> {})
Weigher
Function to determine the size of an element. Required to trigger a Data Size flush. For instance, a collection of integer elements can define its weigher as:
public class MyCollectionWeigher implements Function<Integer, Long> {
@Override
public Long apply(Integer element) {
return element.toString().length();
}
}
By default, the weight is calculated after converting the element to String and counting its number of bytes using UTF-8 encoding.
Success Handler
Consumer to be executed if the flush handler was successfully executed. For instance, a success handler for a collection of integer elements can be defined as:
public class MyCollectionSuccessHandler implements Consumer<FlushableCollection.Batch<Integer>> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch) {
// Do something with the successful batch
}
}
By default, logs a debug message with the details of the processed batch: builder.successHandler(batch -> log.debug(...))
Failure Handler
BiConsumer to be executed if the flush handler failed. For instance, a failure handler for a collection of integer elements can be defined as:
public class MyCollectionFailureHandler implements Consumer<FlushableCollection.Batch<Integer>, Throwable> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch, Throwable ex) {
// Do something with the failed batch
}
}
By default, logs an error message with the details of the batch and its exception: builder.failureHandler((batch, ex) -> log.error(...))
.
If the flush event causes a timeout, the input Throwable
will be of type java.util.concurrent.TimeoutException
.
Usage
The collection must be created through the FlushableCollectionFactory
bean, by providing the expected type. If the application
context finds a FlushableCollectionProperties
with the same name, it will be used as template for the new collection. Otherwise, a
new properties set with default values will be created. Note that any pre-defined property can be overridden during the build phase.
@Inject
protected FlushableCollectionFactory flushableCollectionFactory;
void submit() {
try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
.flushHandler(new MyCollectionFlushHandler())
.weigher(new MyCollectionWeigher())
.successHandler(new MyCollectionSuccessHandler())
.failureHandler(new MyCollectionFailureHandler())
.build()) {
for (int i = 0; i < 10; i++) {
collection.add(i);
}
}
}
Flush Events
COUNT
- Triggers if the collection contains more elements than the value defined in themaxCount
property. If undefined, it will never be triggered.DATA_SIZE
- Triggers if the size of the elements in the collection is greater than the value defined in themaxDataSize
property. If undefined, it will never be triggered.SCHEDULED
- Triggers based on the schedule defined with theflushAfter
property. If undefined, it will never be triggered.MANUAL
- Triggers whenever thecollection.flush()
method is called.CLOSE
- Triggers whenever the collection is closed by either using atry
with resources, or by calling thecollection.close()
method.
Flush Metrics
Each collection will publish metrics about the duration of each flush event, its size, and the count of success/failures (see Metrics section).
This can be refined with the use of custom tags during the creation of the collection:
try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
.tag("key", "value")
.build()) {
// Do something with the collection
}
The metrics will then be available in:
GET /metrics/pdp.collections.flushable.[type]
- The count of successful and failed flush events.GET /metrics/pdp.collections.flushable.[type].duration
- The duration for the flush handler.GET /metrics/pdp.collections.flushable.[type].size
- The size of the flushed elements.
DSL
The PDP DSL is an abstract definition for a common language to be used in any PDP product. Its intention is to have a standardized way to express configurations that might be interpreted in a different way according to the needs of the product itself.
Filters
A filter is a criteria to be applied to a given object. In order to use it, the FilterAdapter
interface needs to be
implemented. The core supports the following concrete adapters:
MapFilterAdapter
- Converts the filter into a predicate used to evaluateMap<String, Object>
structures. The expected field name is the key of the map.JsonPathFilterAdapter
- Converts the filter into a predicate used to evaluateDocumentContext
structures. The expected field name is a JSON Path to be found in the JSON document.JsonPointerFilterAdapter
- Converts the filter into a predicate used to evaluateObjectNode
structures. The expected field name is a JSON Pointer to be found in the JSON document.
All filters have an optional
source
field that could be used by the concrete implementation to select among multiple data structures
"Equals" Filter
The value of the field must be exactly as the one provided.
var filter = EqualsFilter.builder().field("field").value("value").build();
{
"equals": {
"field": "field",
"value": "value"
}
}