<!-- Space: PDPDoc -->
<!-- Parent: Intro -->
<!-- Parent: User Guide -->
<!-- Parent: Core -->
<!-- Title: Core Common -->
<h1>Binary Data Service</h1>
<p>The Binary Data Service is an abstraction that hides the complexity of storing and retrieving binary data using configurable providers.</p>
<p>Uses the concept of sessions to <em>buckets</em> in order to give a logical separation between different scopes. For example, "config" could
be a bucket to store all data related to the configurations scope, and it will be represented as a folder in the <a href="#local-file-system">local file system provider</a>.</p>
<pre><code class="language-java">@Inject
protected BinaryDataService binaryDataService;
protected BinaryDataServiceSession session;
@PostConstruct
void init() {
session = binaryDataService.newSession("config");
}
public void storeConfig(String key, InputStream inputStream) {
session.store(key, inputStream);
}
public Optional<InputStream> findConfig(String key) {
return session.find(key);
}
</code></pre>
<h2>Providers</h2>
<h3>Local File System</h3>
<p>The default provider. Uses the local file system, or any mounted drive as the storage:</p>
<pre><code class="language-yaml">storage:
binary:
provider: local
dataPath: other
</code></pre>
<h4>Configuration parameters:</h4>
<p><code>storage.binary.provider</code></p>
<p>(Optional, String) If given, <strong>must</strong> be <code>local</code>.</p>
<p><code>storage.binary.dataPath</code></p>
<p>(Optional, String) The path where the data will be stored. Default is <code>${tmp}/binary</code>,
where <code>${tmp}</code> is the default temporary folder of the OS where the application runs.</p>
<h3>SFTP</h3>
<p>Uses a remote SFTP server as storage:</p>
<pre><code class="language-yaml">storage:
binary:
provider: sftp
dataPath: other
server: sftp://user:pass@localhost:22
connection:
keepAliveInterval: 1m
connectTimeout: 30s
readTimeout: 30s
</code></pre>
<blockquote>
<p>Keep in mind that SFTP is known to be slow. <strong>Use it only as last resource</strong></p>
</blockquote>
<h4>Configuration parameters:</h4>
<p><code>storage.binary.provider</code></p>
<p>(Required, String) <strong>Must</strong> be <code>sftp</code>.</p>
<p><code>storage.binary.dataPath</code></p>
<p>(Optional, String) The path where the data will be stored. Default is <code>/binary</code>.</p>
<p><code>storage.binary.server</code></p>
<p>(Optional, String) The URI to the SFTP server. Default is <code>sftp://pdp:pdp@localhost:22</code>.</p>
<p><code>storage.binary.connection.keepAliveInterval</code></p>
<p>(Optional, Duration) The interval to send a keep alive signal to the server. Default is <code>30s</code>.</p>
<p><code>storage.binary.connection.connectTimeout</code></p>
<p>(Optional, Duration) The timeout for the connection to the server. Default is <code>5s</code>.</p>
<p><code>storage.binary.connection.readTimeout</code></p>
<p>(Optional, Duration) The timeout for the reading from the server. Default is <code>3s</code>.</p>
<h1>Cache Manager</h1>
<p>The local cache is an extension to the one provided by Micronaut and implemented with <a href="https://github.com/ben-manes/caffeine">Caffeine</a>.
It simplifies the process of caching entities with the use of annotations and configurations in the <code>application.yml</code></p>
<h2>Configuration</h2>
<pre><code class="language-yaml">storage:
cache:
local:
myEntity:
initialCapacity: 10
maximumSize: 50
maximumWeight: 100
expireAfterWrite: 1h
expireAfterAccess: 5m
recordStats: true
testMode: false
</code></pre>
<p>For a cache named <code>myEntity</code> with the following properties:</p>
<p>| Property | Type | Default | Description |
| ----------------- | :------: | :-----: | ----------- |
| initialCapacity | Integer | 16 | The minimum size of the cache |
| maximumSize | Long | | The maximum size of the cache. <strong>Can not be combined with a weigher</strong> |
| maximumWeight | Long | | The maximum weight to be allowed for an element in the cache (see <a href="#weigher">Weigher</a> section) |
| expireAfterWrite | Duration | | The time to wait to expire an element after its creation |
| expireAfterAccess | Duration | 5m | The time to wait to expire an element after the last time it was accessed |
| recordStats | boolean | true | To record statistics about hit rates and evictions (see <a href="#cache-statistics">Cache Statistics</a> section) |
| testMode | boolean | false | To execute all cache operations in a single thread |</p>
<blockquote>
<p>Each cache <strong>must</strong> have a unique name, which will be automatically normalized to <em>kebab-case</em> (i.e. <code>myEntity</code> becomes <code>my-entity</code>)</p>
</blockquote>
<p>A default configuration for a cache can be defined as a bean:</p>
<pre><code class="language-java">@Factory
@Requires(missingProperty = LocalCacheProperties.PREFIX + "." + MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheConfig {
public static final String CACHE_NAME = "my-entity";
@Bean
@Named(CACHE_NAME)
LocalCacheProperties cacheProperties(ApplicationConfiguration applicationConfiguration) {
return LocalCacheProperties.builder()
.cacheName(CACHE_NAME)
.applicationConfiguration(applicationConfiguration)
.build();
}
}
</code></pre>
<h3>Weigher</h3>
<p>A weigher determines if an element becomes too <em>heavy</em> to be in the cache. If for a cache's <code>maximumWeight</code> there is
not a corresponding <strong>named</strong> weigher, any other weigher will be selected. If there is no registered weigher, a default
weigher where every element has a weight of 1 will be used:</p>
<pre><code class="language-java">import com.github.benmanes.caffeine.cache.*;
@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheWeigher implements Weigher<UUID, MyEntity> {
@Override
public @NonNegative int weigh(@NonNull UUID key, @NonNull MyEntity value) {
return 0;
}
}
</code></pre>
<h3>Removal Listener</h3>
<p>A listener that triggers every time an element is evicted:</p>
<pre><code class="language-java">import com.github.benmanes.caffeine.cache.*;
@Singleton
@Named(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityCacheRemovalListener implements RemovalListener<UUID, MyEntity> {
@Override
public void onRemoval(@Nullable UUID key, @Nullable MyEntity value, @NonNull RemovalCause cause) {
// Do something with the event
}
}
</code></pre>
<h2>Annotation-based caching</h2>
<p>Any class (POJOs, Connections, Factories...) can be stored in the cache. For example, instances of the following
entity type:</p>
<pre><code class="language-java">@Data
public class MyEntity implements CoreEntity<UUID> {
private UUID id;
private String name;
...
}
</code></pre>
<p>can be cached in any managed bean with the use of <code>io.micronaut.cache.annotation</code> annotations:</p>
<ul>
<li><code>@Cacheable</code></li>
<li><code>@CachePut</code></li>
<li><code>@CacheInvalidate</code></li>
</ul>
<pre><code class="language-java">@Singleton
@CacheConfig(MyEntityCacheConfig.CACHE_NAME)
public class MyEntityService {
@Inject
protected MyEntityRepository myEntityRepository;
@Cacheable(keyGenerator = CoreEntityKeyGenerator.class)
public List<MyEntity> getAll() {
return myEntityRepository.getAll();
}
@Cacheable
public MyEntity getOne(UUID id) {
return myEntityRepository.getOne(id);
}
@CachePut(keyGenerator = CoreEntityKeyGenerator.class)
public void store(MyEntity myEntity) {
myEntityRepository.store(myEntity);
}
@CacheInvalidate
public MyEntity delete(UUID id) {
return myEntityRepository.delete(id);
}
}
</code></pre>
<blockquote>
<p>The key for the cacheable object <strong>must</strong> implement <code>equals()</code> and <code>hashCode()</code></p>
</blockquote>
<p>Note that for the <code>getAll()</code> and <code>store(MyEntity)</code> methods, a custom key generator needs to be specified.
This way the cache will calculate the appropriate <code>key</code> for each entity. If no generator is defined,
the <code>DefaultCacheKeyGenerator</code> is used.</p>
<blockquote>
<p>The <code>CoreEntityKeyGenerator</code> can be used with any entity that implements <code>CoreEntity<T></code></p>
</blockquote>
<p>Multiple caches can be configured in the same <code>@CacheConfig</code>, in which case, the name of the used
cache <strong>must</strong> be specified. Likewise, the key for the cached value can be a composite of multiple
objects (internally wrapped as a <code>ParametersKey</code> and generated by a <code>DefaultCacheKeyGenerator</code>):</p>
<pre><code class="language-java">@Singleton
@CacheConfig({ "cacheA", "cacheB" })
public class MyMultiCacheService {
@Cacheable("cacheA")
public MyEntity getOneA(UUID id) {
...
}
@Cacheable("cacheB")
public MyEntity getOneB(UUID id, UUID parentId) {
...
}
}
</code></pre>
<h2>Cache Statistics</h2>
<p>If the cache statistics are enabled, they will be published as part of the application <a href="#metrics">metrics</a>:</p>
<ul>
<li><code>cache.eviction.weight</code> - the sum of weights of evicted entries</li>
<li><code>cache.evictions</code> - the count of cache evictions</li>
<li><code>cache.size</code> - the <strong>estimated</strong> number of entries in the cache</li>
<li><code>cache.gets</code> - the number of times a cache-annotated method has returned an item (regardless if it was cached or not). This metric can be refined
with the use of tags:
<ul>
<li><code>result:hit</code> - the number of times cache lookup methods have returned a cached value</li>
<li><code>result:miss</code> - the number of times cache lookup methods have returned an uncached value</li>
</ul>
</li>
</ul>
<blockquote>
<p>If the application has multiple caches, the metrics can be filtered with the <code>cache:my-entity</code> tag</p>
</blockquote>
<h1>Collections</h1>
<h2>Flushable Collection</h2>
<p>A data structure that asynchronously flushes its content any time a preconfigured criteria is met. It is backed up by an <code>ArrayList<T></code>
and it is guaranteed to be <em>thread-safe</em>.</p>
<h3>Configuration</h3>
<h4>Basic Properties</h4>
<p>| Property | Type | Default | Description |
| ------------ | :------: | :-----: | ----------- |
| maxCount | Integer | | The maximum number of elements before flushing. Triggers a <a href="#flush-events">Count</a> flush |
| maxDataSize | Long | | The maximum size of the collection elements before flushing. Triggers a <a href="#flush-events">Data Size</a> flush |
| flushAfter | Duration | | The duration before flushing. Triggers a <a href="#flush-events">Scheduled</a> flush |
| threads | Integer | 5 | The number of threads used to execute the flush event |
| flushTimeout | Duration | 10m | The timeout for the flush event |</p>
<h5>Properties Template</h5>
<p>For a collection of type <code>my-collection</code>, a set of default properties can be defined as a bean or in the <code>application.yml</code>:</p>
<pre><code class="language-yaml">collections:
flushable:
myCollection:
maxCount: 10
maxDataSize: 1mb
flushAfter: 5m
threads: 10
flushTimeout: 10m
</code></pre>
<p>The same configuration can be defined as:</p>
<pre><code class="language-java">@Factory
@Requires(missingProperty = FlushableCollectionProperties.PREFIX + "." + MyFlushableCollectionConfig.TYPE)
public class MyFlushableCollectionConfig {
public static final String TYPE = "my-collection";
@Bean
@Named(TYPE)
FlushableCollectionProperties collectionProperties() {
return FlushableCollectionProperties.builder()
.type(TYPE)
.maxCount(10)
.maxDataSize(DataSize.ofMegabytes(1).asBytes())
.flushAfter(Duration.ofMinutes(5))
.threads(10)
.flushTimeout(Duration.ofMinutes(10))
.build();
}
}
</code></pre>
<blockquote>
<p>Each collection definition <strong>must</strong> have a unique name, which will be automatically normalized to <em>kebab-case</em> (i.e. <code>myCollection</code> becomes <code>my-collection</code>)</p>
</blockquote>
<h4>Action Handlers</h4>
<h5>Flush Handler</h5>
<p>Consumer to be called when the flush event is triggered. For instance, a flush handler for a collection
of integer elements can be defined as:</p>
<pre><code class="language-java">public class MyCollectionFlushHandler implements Consumer<FlushableCollection.Batch<Integer>> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch) {
// Do something with the batch
}
}
</code></pre>
<p>By default, does nothing: <code>builder.flushHandler(batch -> {})</code></p>
<h5>Weigher</h5>
<p>Function to determine the size of an element. Required to trigger a <a href="#data-size-flush">Data Size</a> flush.
For instance, a collection of integer elements can define its weigher as:</p>
<pre><code class="language-java">public class MyCollectionWeigher implements Function<Integer, Long> {
@Override
public Long apply(Integer element) {
return element.toString().length();
}
}
</code></pre>
<p>By default, the weight is calculated after converting the element to String and counting its number of bytes
using <em>UTF-8</em> encoding.</p>
<h5>Success Handler</h5>
<p>Consumer to be executed if the <a href="#flush-handler">flush handler</a> was successfully executed. For instance,
a success handler for a collection of integer elements can be defined as:</p>
<pre><code class="language-java">public class MyCollectionSuccessHandler implements Consumer<FlushableCollection.Batch<Integer>> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch) {
// Do something with the successful batch
}
}
</code></pre>
<p>By default, logs a debug message with the details of the processed batch: <code>builder.successHandler(batch -> log.debug(...))</code></p>
<h5>Failure Handler</h5>
<p>BiConsumer to be executed if the <a href="#flush-handler">flush handler</a> failed. For instance, a failure handler for a collection
of integer elements can be defined as:</p>
<pre><code class="language-java">public class MyCollectionFailureHandler implements Consumer<FlushableCollection.Batch<Integer>, Throwable> {
@Override
public void accept(FlushableCollection.Batch<Integer> batch, Throwable ex) {
// Do something with the failed batch
}
}
</code></pre>
<p>By default, logs an error message with the details of the batch and its exception: <code>builder.failureHandler((batch, ex) -> log.error(...))</code>.
If the flush event causes a timeout, the input <code>Throwable</code> will be of type <code>java.util.concurrent.TimeoutException</code>.</p>
<h3>Usage</h3>
<p>The collection <strong>must</strong> be created through the <code>FlushableCollectionFactory</code> bean, by providing the expected type. If the application
context finds a <code>FlushableCollectionProperties</code> with the same name, it will be used as template for the new collection. Otherwise, a
new properties set with default values will be created. Note that any pre-defined property can be overridden during the <em>build</em> phase.</p>
<pre><code class="language-java">@Inject
protected FlushableCollectionFactory flushableCollectionFactory;
void submit() {
try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
.flushHandler(new MyCollectionFlushHandler())
.weigher(new MyCollectionWeigher())
.successHandler(new MyCollectionSuccessHandler())
.failureHandler(new MyCollectionFailureHandler())
.build()) {
for (int i = 0; i < 10; i++) {
collection.add(i);
}
}
}
</code></pre>
<h3>Flush Events</h3>
<ul>
<li><code>COUNT</code> - Triggers if the collection contains more elements than the value defined in the <code>maxCount</code> property.
If undefined, it will never be triggered.</li>
<li><code>DATA_SIZE</code> - Triggers if the size of the elements in the collection is greater than the value defined in the
<code>maxDataSize</code> property. If undefined, it will never be triggered.</li>
<li><code>SCHEDULED</code> - Triggers based on the schedule defined with the <code>flushAfter</code> property. If undefined, it will never be triggered.</li>
<li><code>MANUAL</code> - Triggers whenever the <code>collection.flush()</code> method is called.</li>
<li><code>CLOSE</code> - Triggers whenever the collection is closed by either using a <code>try</code> with resources, or by calling
the <code>collection.close()</code> method.</li>
</ul>
<h3>Flush Metrics</h3>
<p>Each collection will publish metrics about the duration of each flush event, its size, and the count of success/failures (see <a href="#metrics">Metrics</a> section).</p>
<p>This can be refined with the use of custom tags during the creation of the collection:</p>
<pre><code class="language-java">try (var collection = flushableCollectionFactory.<Integer>builder("my-collection")
.tag("key", "value")
.build()) {
// Do something with the collection
}
</code></pre>
<p>The metrics will then be available in:</p>
<ul>
<li><code>GET /metrics/pdp.collections.flushable.[type]</code> - The count of successful and failed flush events.</li>
<li><code>GET /metrics/pdp.collections.flushable.[type].duration</code> - The duration for the flush handler.</li>
<li><code>GET /metrics/pdp.collections.flushable.[type].size</code> - The size of the flushed elements.</li>
</ul>
<h1>DSL</h1>
<p>The PDP DSL is an abstract definition for a common language to be used in any PDP product. Its intention is to have a
standardized way to express configurations that might be interpreted in a different way according to the needs of the
product itself.</p>
<h2>Filters</h2>
<p>A filter is a criteria to be applied to a given object. In order to use it, the <code>FilterAdapter</code> interface needs to be
implemented. The core supports the following concrete adapters:</p>
<ul>
<li><code>MapFilterAdapter</code> - Converts the filter into a predicate used to evaluate <code>Map<String, Object></code> structures. The
expected field name is the key of the map.</li>
<li><code>JsonPathFilterAdapter</code> - Converts the filter into a predicate used to evaluate <code>DocumentContext</code> structures. The expected
field name is a <a href="https://goessner.net/articles/JsonPath">JSON Path</a> to be found in the JSON document.</li>
<li><code>JsonPointerFilterAdapter</code> - Converts the filter into a predicate used to evaluate <code>ObjectNode</code> structures. The expected
field name is a <a href="https://www.rfc-editor.org/rfc/rfc6901">JSON Pointer</a> to be found in the JSON document.</li>
</ul>
<blockquote>
<p>All filters have an optional <code>source</code> field that could be used by the concrete implementation to select among multiple
data structures</p>
</blockquote>
<h3>"Equals" Filter</h3>
<p>The value of the field must be exactly as the one provided.</p>
<pre><code class="language-java">var filter = EqualsFilter.builder().field("field").value("value").build();
</code></pre>
<pre><code class="language-json">{
"equals": {
"field": "field",
"value": "value"
}
}
</code></pre>
<h3>"Greater Than" Filter</h3>
<p>The value of the field must be greater than the one provided.</p>
<pre><code class="language-java">var filter = GreaterThanFilter.builder().field("field").value(1).build();
</code></pre>
<pre><code class="language-json">{
"gt": {
"field": "field",
"value": 1
}
}
</code></pre>
<h3>"Greater Than or Equals" Filter</h3>
<p>The value of the field must be greater than or equals to the one provided.</p>
<pre><code class="language-java">var filter = GreaterThanOrEqualsFilter.builder().field("field").value(1).build();
</code></pre>
<pre><code class="language-json">{
"gte": {
"field": "field",
"value": 1
}
}
</code></pre>
<h3>"Less Than" Filter</h3>
<p>The value of the field must be less than the one provided.</p>
<pre><code class="language-java">var filter = LessThanFilter.builder().field("field").value(1).build();
</code></pre>
<pre><code class="language-json">{
"lt": {
"field": "field",
"value": 1
}
}
</code></pre>
<h3>"Less Than or Equals" Filter</h3>
<p>The value of the field must be less than or equals to the one provided.</p>
<pre><code class="language-java">var filter = LessThanOrEqualsFilter.builder().field("field").value(1).build();
</code></pre>
<pre><code class="language-json">{
"lte": {
"field": "field",
"value": 1
}
}
</code></pre>
<h3>"In" Filter</h3>
<p>The value of the field must be one of the provided values.</p>
<pre><code class="language-java">var filter = InFilter.builder().field("field").value("valueA").value("valueB").build();
</code></pre>
<pre><code class="language-json">{
"in": {
"field": "field",
"values": [
"valueA",
"valueB"
]
}
}
</code></pre>
<h3>"Empty" Filter</h3>
<p>Checks if a field is empty:</p>
<ul>
<li>For a collection, <code>true</code> if its size is 0</li>
<li>For a String, <code>true</code> if its length is 0</li>
<li>For any other type, <code>true</code> if it is <code>null</code></li>
</ul>
<pre><code class="language-java">var filter = EmptyFilter.builder()
.field("field")
.build();
</code></pre>
<pre><code class="language-json">{
"empty": {
"field": "field"
}
}
</code></pre>
<h3>"Exists" Filter</h3>
<p>Checks if a field exists.</p>
<pre><code class="language-java">var filter = ExistsFilter.builder()
.field("field")
.build();
</code></pre>
<pre><code class="language-json">{
"exists": {
"field": "field"
}
}
</code></pre>
<h3>"Not" Filter</h3>
<p>Negates the inner clause.</p>
<pre><code class="language-java">var filter = NotFilter.builder()
.clause(EqualsFilter.builder().field("field").value("value").build())
.build();
</code></pre>
<pre><code class="language-json">{
"not": {
"equals": {
"field": "field",
"value": "value"
}
}
}
</code></pre>
<h3>"Null" Filter</h3>
<p>Checks if a field is null. Note that while the "exists" filter checks whether the field is present or not, the "null" filter
expects the field to be present but with <code>null</code> value.</p>
<pre><code class="language-java">var filter = NullFilter.builder()
.field("field")
.build();
</code></pre>
<pre><code class="language-json">{
"null": {
"field": "field"
}
}
</code></pre>
<h3>Boolean Operators</h3>
<h4>"And" Filter</h4>
<p>All conditions in the list <strong>must</strong> be evaluated to <code>true</code>.</p>
<pre><code class="language-java">var filter = AndFilter.builder()
.clause(EqualsFilter.builder().field("fieldA").value("valueA").build())
.clause(EqualsFilter.builder().field("fieldB").value("valueB").build())
.build();
</code></pre>
<pre><code class="language-json">{
"and": [
{
"equals": {
"field": "fieldA",
"value": "valueA"
}
}, {
"equals": {
"field": "fieldB",
"value": "valueB"
}
}
]
}
</code></pre>
<h4>"Or" Filter</h4>
<p>At least one condition in the list <strong>must</strong> be evaluated to <code>true</code>.</p>
<pre><code class="language-java">var filter = OrFilter.builder()
.clause(EqualsFilter.builder().field("fieldA").value("valueA").build())
.clause(EqualsFilter.builder().field("fieldB").value("valueB").build())
.build();
</code></pre>
<pre><code class="language-json">{
"or": [
{
"equals": {
"field": "fieldA",
"value": "valueA"
}
}, {
"equals": {
"field": "fieldB",
"value": "valueB"
}
}
]
}
</code></pre>
<h1>HTTP Base Client</h1>
<p>The HTTP Base Client is an abstraction of the <a href="https://square.github.io/okhttp/4.x/okhttp/okhttp3/-ok-http-client/">OkHttpClient</a>.
It provides features such as retry handlers and automated paginated requests.</p>
<p>The minimum initialization for the client defines the configuration of the HTTP connections and other shared
properties to be used by the endpoints:</p>
<pre><code class="language-java">public class MyClient extends HttpBaseClient {
protected MyClient(
@NonNull OkHttpClient client,
@NonNull ObjectMapper objectMapper,
@NonNull BackoffPolicyProperties backoffPolicy,
MeterRegistry meterRegistry,
@NonNull ScrollProperties scroll)
{
super(client, objectMapper, backoffPolicy, scroll);
}
@Override
public boolean ping() {
return true;
}
public static Builder builder() {
return new Builder();
}
public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
@Override
public MyClient build() {
return new MyClient(
newClientBuilder().build(),
getObjectMapper(),
getBackoffPolicy(),
getScroll());
}
}
}
</code></pre>
<p>The builder can be also initialized with a <code>HttpBaseClientProperties</code> instance:</p>
<pre><code class="language-java">public class MyClient extends HttpBaseClient {
...
public static Builder builder(HttpBaseClientProperties clientProperties) {
return new Builder(clientProperties);
}
@NoArgsConstructor
public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
protected Builder(HttpBaseClientProperties clientProperties) {
super(clientProperties);
}
...
}
}
</code></pre>
<p>If some configurations are not desired on the implemented client, they can be disabled in the builder:</p>
<pre><code class="language-java">public class MyClient extends HttpBaseClient {
...
@HttpBaseClientBuilder.DisabledConfig({
HttpBaseClientBuilder.Config.COMPRESS_REQUESTS,
HttpBaseClientBuilder.Config.FOLLOW_REDIRECTS,
HttpBaseClientBuilder.Config.SCROLL
})
public static final class Builder extends HttpBaseClientBuilder<Builder, MyClient> {
...
}
}
</code></pre>
<p>any attempt to set a disabled configuration will throw an <code>UnsupportedOperationException</code>.</p>
<p>By default, the retry of the <code>executeWithBackoff(...)</code> method will happen if the HTTP response code is:</p>
<ul>
<li>408 - Request Time Out</li>
<li>429 - Too Many Request</li>
<li>500 - Internal Server Error</li>
<li>504 - Gateway Timeout</li>
</ul>
<p>The condition can be changed by sending a different predicate to the method:</p>
<pre><code class="language-java">client.executeWithBackoff(requestDetails, response -> false);
</code></pre>
<p>In this case, the request will not be retried because of a status code (although it might be retried because of an exception during the execution).</p>
<h2>HTTP Round Robin Client</h2>
<p>The HTTP Round Robin Client is a specialization of the <code>HttpBaseClient</code> backed up by a round-robin collection.
Provides the corresponding classes as described in the previous section: <code>HttpRoundRobinClient</code>, <code>HttpRoundRobinClientBuilder</code>,
and <code>HttpRoundRobinClientProperties</code>.</p>
<h2>HTTP Client Request</h2>
<p>The request object is a simple POJO with the properties needed to execute the HTTP call:</p>
<pre><code class="language-java">@RequiredArgsConstructor
public class MyRequest implements HttpClientRequest {
private final String id;
}
</code></pre>
<blockquote>
<p>For cases when no data is needed in the request, an empty implementation is avaliable in <code>HttpRoundRobinClient#EMPTY_REQUEST</code></p>
</blockquote>
<h3>HTTP Client Pageable Request</h3>
<p>The <code>HttpClientPageableRequest</code> interface is an extension of <code>HttpClientRequest</code> and defines default methods to handle pagination parameters.</p>
<h2>HTTP Client Response</h2>
<h3>Single Response</h3>
<p>Expected when the response is a single payload.</p>
<pre><code class="language-java">public class MySinglePayloadResponse extends AbstractSingleResponse<MyRequest> {
public MySinglePayloadResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse);
}
@Override
protected void handleSuccess(Response httpResponse) {
// Do something with the response
}
}
</code></pre>
<p>With this definition, the endpoint can be exposed in the client:</p>
<pre><code class="language-java">public class MyClient extends HttpRoundRobinClient {
...
public MySinglePayloadResponse get(MyRequest clientRequest) {
var url = serverHosts.next()
.newBuilder()
.addPathSegment("content")
.addPathSegment(request.getId())
.build();
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(url).get().build(),
clientRequest);
return new MySinglePayloadResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails)
);
}
...
}
</code></pre>
<p>Both <code>handleSuccess(Response)</code> and <code>handleFailure(Response)</code> have a default implementation that can be overridden.
A response is considered a success if its HTTP status code is in the 200 to 299 range and its body is not <code>null</code>
(not to be confused with an empty body). This check can be changed in the constructor:</p>
<pre><code class="language-java">public class MySinglePayloadResponse extends AbstractSingleResponse<MyRequest> {
public MySinglePayloadResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse, r -> r.code() == 400);
}
}
</code></pre>
<p>If the response was a failure, but it had a payload, it can be retrieved with <code>AbstractSingleResponse#getErrorMessage()</code>,
or <code>AbstractSingleResponse#getErrorMessage(Class)</code> (to convert the payload into the given class type).</p>
<h3>Iterable Response</h3>
<p>Expected when the response is a full collection of iterable elements with no pagination. It is assumed that the payload
of the response is a JSON array (if that's not the case, the <code>handleSuccess(Response httpResponse)</code> method <strong>must</strong> be
overridden).</p>
<pre><code class="language-java">public class MyIterableResponse extends AbstractIterableResponse<MyRequest, String> {
public MyIterableResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse
) {
super(objectMapper, requestDetails, httpResponse);
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
</code></pre>
<p>With this definition, the endpoint can be exposed in the client:</p>
<pre><code class="language-java">public class MyClient extends HttpRoundRobinClient {
...
public MyIterableResponse list(MyRequest clientRequest) {
var url = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.build();
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(url).get().build(),
clientRequest);
return new MyIterableResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails)
);
}
...
}
</code></pre>
<p>Each element in the response can then be retrieved by using the <code>Iterable<></code> interface.</p>
<p>The predicate to determine if a response was successful can be provided in the constructor, the same way as in the
previous sections.</p>
<h3>Paginated Response</h3>
<h4>Token Page</h4>
<p>Expected when the response is a consecutive list of pages obtained with a token.</p>
<h5>Token Page Response</h5>
<p>The response corresponds to each page from the pagination requests. Since it is not possible to automatically determine
where is the token for the next page, the <code>AbstractTokenPageResponse#handleSuccess(Response)</code> method <strong>must</strong> be implemented.</p>
<pre><code class="language-java">public class MyTokenPageResponse extends AbstractTokenPageResponse<MyRequest, String> {
public MyTokenPageResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse,
String currentToken
) {
super(objectMapper, requestDetails, httpResponse, currentToken);
}
@Override
protected void handleSuccess(Response response) {
try {
var json = objectMapper.readTree(Objects.requireNonNull(response.body()).byteStream());
this.nextPageReference = json.get("token").asText();
if (json.has("content")) {
this.elements = new JsonLazyIterator<>(json.get("content"), this::parseElement);
} else {
this.elements = Collections.emptyIterator();
}
} catch (Exception ex) {
throw new DataException(ex.getMessage(), ex);
}
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
</code></pre>
<p>The predicate to determine if a response was successful can be provided in the constructor, the same way as in the
previous sections.</p>
<h5>Token Page Collection</h5>
<p>The response corresponds to a collection of all the page responses that iterates through the data.</p>
<pre><code class="language-java">public class MyTokenPageCollectionResponse extends AbstractTokenCollectionResponse<MyRequest, String, MyTokenPageResponse> {
public MyTokenPageCollectionResponse(Function<String, MyTokenPageResponse> pageFunction) {
super(pageFunction);
}
}
</code></pre>
<p>With this definition, the endpoint can be exposed in the client:</p>
<pre><code class="language-java">public class MyClient extends HttpRoundRobinClient {
...
public MyTokenPageCollectionResponse paginatedByToken(MyRequest clientRequest) {
return new MyTokenPageCollectionResponse(
token -> {
var urlBuilder = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.addQueryParameter("limit", String.valueOf(scroll.getSize()));
if (token != null) {
urlBuilder.addQueryParameter("token", token);
}
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(urlBuilder.build()).get().build(),
clientRequest);
return new MyTokenPageResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails),
token
);
}
);
}
...
}
</code></pre>
<p>Note that in case of a <code>null</code> token, the function <strong>must</strong> return the first page.</p>
<p>Each element in the response can then be retrieved by using the <code>Iterable<></code> interface. The pagination will be handled
behind the scenes.</p>
<h4>Offset Page</h4>
<p>Expected when the response is a consecutive list of pages obtained with a page number.</p>
<h5>Offset Page Response</h5>
<p>The response corresponds to each page from the pagination requests. It is assumed that the payload of the response
is a JSON array (if that's not the case, the <code>handleSuccess(Response httpResponse)</code> method <strong>must</strong> be
overridden).</p>
<p>It is also assumed that the <code>AbstractOffsetPageResponse#getNextPageReference()</code> is always 1 after the current page.
This method can be overridden for custom behaviors.</p>
<pre><code class="language-java">public class MyOffsetPageResponse extends AbstractOffsetPageResponse<MyRequest, String> {
public MyOffsetPageResponse(
ObjectMapper objectMapper,
HttpClientRequestDetails<MyRequest> requestDetails,
Response httpResponse,
Long currentOffset
) {
super(objectMapper, requestDetails, httpResponse, currentOffset);
}
@Override
protected String parseElement(JsonNode jsonNode) {
return jsonNode.asText();
}
}
</code></pre>
<p>The predicate to determine if a response was successful can be provided in the constructor, the same way as in the
previous sections.</p>
<h5>Pageable Page Response</h5>
<p>The response is a specialization of the <em>Offset Page Response</em>, and it is based on a default page structure:</p>
<pre><code class="language-json">{
"totalPages": 0,
"totalElements": 0,
"size": 0,
"content": [
{}
],
"number": 0,
"first": true,
"last": true,
"sort": {
"sorted": true,
"unsorted": true,
"empty": true
},
"numberOfElements": 0,
"pageable": {
"page": 0,
"size": 0,
"sort": [
"string"
]
},
"empty": true
}
</code></pre>
<h5>Offset Page Collection</h5>
<p>The response corresponds to a collection of all the page responses that iterates through the data.</p>
<pre><code class="language-java">public class MyOffsetPageCollectionResponse extends AbstractOffsetCollectionResponse<MyRequest, String, MyOffsetPageResponse> {
public MyOffsetPageCollectionResponse(Function<String, MyOffsetPageResponse> pageFunction) {
super(pageFunction);
}
}
</code></pre>
<p>With this definition, the endpoint can be exposed in the client:</p>
<pre><code class="language-java">public class MyClient extends HttpRoundRobinClient {
...
public MyOffsetPageCollectionResponse paginatedByOffset(MyRequest clientRequest) {
return new MyOffsetPageCollectionResponse(
offset -> {
var urlBuilder = serverHosts.next()
.newBuilder()
.addPathSegment("list")
.addPathSegment(request.getId())
.addQueryParameter("limit", String.valueOf(scroll.getSize()));
if (token != null) {
urlBuilder.addQueryParameter("page", String.valueOf(offset));
}
var requestDetails = new HttpClientRequestDetails<>(
new Request.Builder().url(urlBuilder.build()).get().build(),
clientRequest);
return new MyOffsetPageResponse(
objectMapper,
requestDetails,
executeWithBackoff(requestDetails),
offset
);
}
);
}
...
}
</code></pre>
<p>Note that in case of a <code>null</code> token, the function <strong>must</strong> return the first page.</p>
<p>Each element in the response can then be retrieved by using the <code>Iterable<></code> interface. The pagination will be handled
behind the scenes.</p>
<h1>JSON</h1>
<p>JSON handling is done with <a href="https://github.com/FasterXML/jackson">Jackson Project</a>, and the default configuration for
the mapper can be instantiated with <code>com.pureinsights.pdp.core.config.CoreSerializer.newInstance()</code>.</p>
<h2>Default Object Mapper</h2>
<h3>Registered Modules</h3>
<ul>
<li><code>JavaTimeModule</code> to handle the <a href="https://jcp.org/en/jsr/detail?id=310">JSR-310: Date and Time API</a></li>
</ul>
<h3>Registered Deserializers</h3>
<ul>
<li><code>com.pureinsights.pdp.core.time.CoreDurationDeserializer</code> as an extension of the default <code>DurationDeserializer</code> to
allow values in human-readable format (i.e. "1s", "12m", "5h"...). <strong>If no suffix is provided, milliseconds are assumed</strong>.
The supported suffixes are:
<ul>
<li><code>d</code> - days</li>
<li><code>h</code> - hours</li>
<li><code>m</code> - minutes</li>
<li><code>s</code> - seconds</li>
<li><code>ms</code> - milliseconds</li>
<li><code>ns</code> - nanoseconds</li>
</ul>
</li>
</ul>
<h3>Enabled Features</h3>
<ul>
<li><code>MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS</code></li>
<li><code>MapperFeature.ACCEPT_CASE_INSENSITIVE_VALUES</code></li>
<li><code>DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY</code></li>
</ul>
<h3>Disabled Features</h3>
<ul>
<li><code>SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS</code></li>
<li><code>SerializationFeature.WRITE_DATES_AS_TIMESTAMPS</code></li>
<li><code>DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES</code></li>
<li><code>DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE</code></li>
</ul>
<h1>Logging Handler</h1>
<p>The logging handler simplifies the process of log analysis by using a JSON output with support for custom POJOs.</p>
<h2>Logback Configuration</h2>
<pre><code class="language-xml"><configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<provider class="com.pureinsights.pdp.core.logging.CoreArgumentsProvider"/>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
</code></pre>
<h2>Custom POJO</h2>
<pre><code class="language-java">@Getter
public class MyPOJO {
private final UUID id = UUID.randomUUID();
}
</code></pre>
<pre><code class="language-java">public class MyPOJOLoggingHandler implements PojoLoggingHandler<MyPOJO> {
private static final String PARENT_LABEL = "myPOJO";
private static final String ID_LABEL = "id";
@Override
public Class<MyPOJO> getSupportedType() {
return MyPOJO.class;
}
@Override
public void write(JsonGenerator generator, MyPOJO pojo) throws IOException {
generator.writeFieldName(PARENT_LABEL);
generator.writeStartObject();
generator.writeObjectField(ID_LABEL, pojo.getId());
generator.writeEndObject();
}
}
</code></pre>
<p>In order to invoke the handler, the class type can call the logging framework:</p>
<pre><code class="language-java">log.info("Message", new MyPOJO());
log.info("Message {}", new MyPOJO());
log.info("Message {}", "with placeholder", new MyPOJO());
</code></pre>
<pre><code class="language-json">{"@timestamp":"2021-12-08T15:07:11.379Z","@version":1,"message":"Message","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"707ea126-d94b-47b6-9010-72a3acc8a786"}}
{"@timestamp":"2021-12-08T15:07:11.380Z","@version":1,"message":"Message MyPOJO$373141eb","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"398bb1df-29b3-4738-afc6-a7b9e892ce19"}}
{"@timestamp":"2021-12-08T15:07:11.381Z","@version":1,"message":"Message with placeholder","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"00ba3228-f434-4ed0-b45e-62ec9163aac4"}}
</code></pre>
<p>Note that the custom POJO is always displayed as expected. The example also demonstrates that the POJO <strong>must not</strong> have
a corresponding placeholder in the message as it will be automatically resolved by the provider. Exceptions <strong>must</strong>
be the in the last position of the input:</p>
<pre><code class="language-java">...
} catch (Exception ex) {
log.error(ex.getMessage(), new MyPOJO(), ex);
}
</code></pre>
<p>If the logging handler is part of the <em>PDP Core Common Libraries</em> project, it <strong>must</strong> be registered in the static section of the
<code>com.pureinsights.pdp.core.logging.CoreArgumentsProvider</code> class.</p>
<p>Any other logging handler under an application context, <strong>must</strong> be declared as a bean (i.e. <code>@Singleton</code>).</p>
<p>If none of the above is applicable, the handler <strong>must</strong> be registered using the <code>CoreArgumentsProvider#registerHandler(PojoLoggingHandler)</code>
method.</p>
<h1>Metrics</h1>
<p>The metrics of the application are collected and exposed by <a href="https://micrometer.io/">Micrometer</a>. They are enabled
by default in the <code>application.yml</code> and exposed through the <code>GET /metrics</code> and <code>GET /metrics/[name]</code> endpoints:</p>
<pre><code class="language-yaml">micronaut:
metrics:
enabled: true
</code></pre>
<pre><code class="language-json">{
"names": [
"http.server.requests",
"jvm.buffer.count",
"jvm.buffer.memory.used",
...,
"pdp.metric.count"
]
}
</code></pre>
<blockquote>
<p>When available, the metrics can be filtered with the use of tags: <code>GET /metrics/pdp.metric.count?tag=source:component&tag=type:value1</code></p>
</blockquote>
<blockquote>
<p>By default, all metrics include the <code>component</code> tag with the name of the application</p>
</blockquote>
<h2>Annotation-based metrics</h2>
<p>Any method in any bean can be measured with:</p>
<ul>
<li><code>@Timed</code></li>
<li><code>@Counted</code></li>
</ul>
<pre><code class="language-java">@Singleton
public class MyService {
@Timed(value = "pdp.myservice.execute", histogram = true)
public void execute(UUID id) {
...
}
@Counted("pdp.myservice.validate")
public void validate(UUID id) {
...
}
}
</code></pre>
<blockquote>
<p>The name of the metric <strong>must</strong> be in lowercase, using dot notation</p>
</blockquote>
<h2>Custom metrics</h2>
<p>Instead of using annotations, the metric registry can be injected and manually handled in any bean, with any
type of <a href="https://micrometer.io/docs/concepts">meter</a>:</p>
<pre><code class="language-java">@Singleton
public class MyCustomService {
private Counter counter;
@PostConstruct
void init(MeterRegistry meterRegistry) {
counter = Counter.builder("pdp.mycustomservice.count")
.tag("key", "value")
.register(meterRegistry);
}
public void count() {
counter.increment();
}
}
</code></pre>
<h1>Retries</h1>
<p><code>RetryUtils</code>, as its name may suggest, provide utils or methods to handle the retry of a <code>Runnable</code> or <code>Supplier</code>. It also handles the exceptions might occur during the run.</p>
<p>By default, the <code>RetryUtils</code> will use a <code>ConstantDelayRetry</code>, wich will retry 3 times with a delay of 5 seconds.</p>
<pre><code class="language-java">...
AtomicInteger atomicInteger = new AtomicInteger();
RetryUtils.retry(atomicInteger::getAndIncrement)
</code></pre>
<p>You can also specify a custom retry policy passing it through parameters.</p>
<pre><code class="language-java">...
AtomicInteger atomicInteger = new AtomicInteger();
RetryUtils.retry(atomicInteger::getAndIncrement, new ConstantDelayRetry(2, 100))
</code></pre>
<p>In case of any failure during the retry an exception will be thrown and the <code>RetryUtils</code> will handle it.
When the number of retries reaches the specified in the retry policy a <code>CoreException</code> will be thrown.</p>
<h1>Scheduler</h1>
<p>Scheduled tasks are handled by <a href="http://www.quartz-scheduler.org/">Quartz</a>. Any application can inject the
<code>org.quartz.Scheduler</code> bean to register/unregister its own cron jobs.</p>
<p>Any execution of a <code>org.quartz.Job</code> implementation will automatically perform the dependency injection of its
visible fields without the need of declaring the class as a singleton.</p>
<h1>Message Queue Provider</h1>
<p>To handle communication between the different components, PDP uses a message queue. The following is a quick overview
of the available configuration fields, examples on how to use the Message Queue classes and the current implementations
available with configuration examples.</p>
<h2>Configuration</h2>
<h3>Basic Configuration</h3>
<p>| Property | Type | Default | Description |
|---------------------------|:------:|:-------:|----------------------------------------------------------------------------------------------|
| <code>messageQueue.provider</code> | String | | The name of the MessageQueue provider to use. (e.g. rabbitmq to load the RabbitMQ provider). |</p>
<h2>Using the MessageQueueProvider</h2>
<p>The MessageQueueProvider instance can be injected as follows:</p>
<pre><code class="language-java">public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
</code></pre>
<h2>Messages</h2>
<h3>Message Properties</h3>
<p>| Property | Type | Default | Description |
|---------------|:-------------------:|:-------:|-------------------------------------------------------------------------------------------|
| <code>body</code> | byte[] | | The body of the message. |
| <code>queue</code> | String | | The queue this message was sent to or delivered from. |
| <code>type</code> | Message.MessageType | | The type of the message. Can be DIRECT or BROADCAST. See <a href="#message-types">Message Types</a>. |
| <code>properties</code> | Map<String,String> | | Map of message properties. |</p>
<h3>Message Types</h3>
<h4>Direct Messages</h4>
<p>Direct messages are sent to a given queue and are meant to be consumed by
a single consumer. The message is persisted in the queue until a consumer is available.</p>
<h4>Broadcast Messages</h4>
<p>Broadcast messages are sent to a given queue and are meant to be consumed by all
consumers available at the moment of sending. The message is not persisted if no
consumers are available.</p>
<h3>Creating a new Message</h3>
<p>The Message class provides a builder to simplify the message creation process:</p>
<pre><code class="language-java">Message newMessage = Message.builder()
.body("My message")
.type(Message.MessageType.DIRECT)
.queue("queue")
.property("key", "value")
.build();
</code></pre>
<p>Some considerations with the Message builder:</p>
<ul>
<li>There are multiple ways to set the message body. If more than one is set, only the last one will be used.</li>
<li>Every time the <code>property</code> method is called, the given property is appended to the map.</li>
<li>Calling <code>properties</code> method will overwrite any previously set properties.</li>
</ul>
<h2>Consumers</h2>
<h3>Configuration</h3>
<p>| Property | Type | Default | Description |
|---------------------------------------|:-------:|:-------:|-------------------------------------------------------------------------------|
| <code>consumer.{name}.maxMessages</code> | Integer | 5 | Maximum number of messages to consume in parallel. |
| <code>consumer.{name}.maxMessageRequeues</code> | Integer | 5 | Maximum number of times a message should be sent back to the queue for retry. |</p>
<h4>Example</h4>
<pre><code class="language-yaml">messageQueue:
consumer:
directConsumer:
maxMessages: 4
maxMessageRequeues: 6
</code></pre>
<p>This configuration can be loaded as follows</p>
<pre><code class="language-java">protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
try {
var maxRequeues = consumerProperties.getMaxMessageRequeues();
...
}
}
</code></pre>
<h3>Registering Consumers</h3>
<p>To register a consumer do the following:</p>
<pre><code class="language-java">public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
try {
mqProvider.registerConsumer(queue, consumerProperties, Message.MessageType.DIRECT, this::consumeMessage);
...
}
}
</code></pre>
<p>The <code>registerConsumer</code> method takes the following parameters:</p>
<p>| Parameter | Type | Description |
|--------------------|:--------------------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| queue | String | Name of the queue to register the Consumer to. |
| consumerProperties | ConsumerProperties | A ConsumerProperties instance for the consumer. |
| messageType | Message.MessageType | The type of message this consumer is supposed to listen to. Can be Message.MessageType.DIRECT or Message.MessageType.BROADCAST. See <a href="#message-types">Message Types</a>. |
| onMessageDelivery | Predicate<Message> | The function to execute when a message is received. Returns true if the message is successfully consumed, false otherwise. |</p>
<h2>Producers</h2>
<h3>Configuration</h3>
<p>| Property | Type | Default | Description |
|-------------------------------|:--------:|:-------:|--------------------------------------------------------------------------------------------------|
| <code>producer.threads</code> | Integer | 5 | Number of threads available for producers to send asyncMessages. |
| <code>producer.sendMessageTimeout</code> | Duration | 30s | Maximum time to wait to get a confirmation that a message has been successfully sent. |
| <code>producer.retry.retries</code> | Integer | 5 | Maximum number of times to try to send a message. |
| <code>producer.retry.delay</code> | Duration | 1s | Time to wait before retrying. The time is multiplied by the number of retries to backoff between executions. |</p>
<h4>Example</h4>
<pre><code class="language-yaml">messageQueue:
producer:
sendMessageTimeout: 10000
threads: 10
retry:
retries: 10
delay: 10000
</code></pre>
<p>This configuration can be loaded by injecting the ProducerProperties instance:</p>
<pre><code class="language-java">public class MyClass {
@Inject
ProducerProperties producerProperties;
...
</code></pre>
<h3>Getting a Producer instance</h3>
<p>To get a new producer instance do the following:</p>
<pre><code class="language-java">public class MyClass {
@Inject
MessageQueueProvider mqProvider;
...
protected void myMethod() {
try {
var myProducer = mqProvider.getProducer();
...
}
}
</code></pre>
<h3>Sending a message</h3>
<h4>Async Messages</h4>
<pre><code class="language-java">producer.asyncSend(newMessage,
(message) -> messageSuccessHandler(message),
(message, exception) -> messageFailureHandler(message, exception));
</code></pre>
<p>The <code>asyncSend</code> method takes the following parameters:</p>
<p>| Parameter | Type | Description |
|----------------|:------------------------------:|------------------------------------------------------------------------------------------------------------------|
| message | Message | The <a href="#messages">Message</a> to send. |
| successHandler | Consumer<Message> | A consumer function to execute when the message is succesfully sent. |</p>
General
Content
Integrations