Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 21 Next »

<!-- Space: PDPDoc --> <!-- Parent: Intro --> <!-- Parent: User Guide --> <!-- Parent: Core --> <!-- Title: Core Common --> <h1>Binary Data Service</h1> <p>The Binary Data Service is an abstraction that hides the complexity of storing and retrieving binary data using configurable providers.</p> <p>Uses the concept of sessions to <em>buckets</em> in order to give a logical separation between different scopes. For example, &quot;config&quot; could be a bucket to store all data related to the configurations scope, and it will be represented as a folder in the <a href="#local-file-system">local file system provider</a>.</p> <pre><code class="language-java">@Inject protected BinaryDataService binaryDataService; protected BinaryDataServiceSession session; @PostConstruct void init() { session = binaryDataService.newSession(&quot;config&quot;); } public void storeConfig(String key, InputStream inputStream) { session.store(key, inputStream); } public Optional&lt;InputStream&gt; findConfig(String key) { return session.find(key); } </code></pre> <h2>Providers</h2> <h3>Local File System</h3> <p>The default provider. Uses the local file system, or any mounted drive as the storage:</p> <pre><code class="language-yaml">storage: binary: provider: local dataPath: other </code></pre> <h4>Configuration parameters:</h4> <p><code>storage.binary.provider</code></p> <p>(Optional, String) If given, <strong>must</strong> be <code>local</code>.</p> <p><code>storage.binary.dataPath</code></p> <p>(Optional, String) The path where the data will be stored. Default is <code>${tmp}/binary</code>, where <code>${tmp}</code> is the default temporary folder of the OS where the application runs.</p> <h3>SFTP</h3> <p>Uses a remote SFTP server as storage:</p> <pre><code class="language-yaml">storage: binary: provider: sftp dataPath: other server: sftp://user:pass@localhost:22 connection: keepAliveInterval: 1m connectTimeout: 30s readTimeout: 30s </code></pre> <blockquote> <p>Keep in mind that SFTP is known to be slow. <strong>Use it only as last resource</strong></p> </blockquote> <h4>Configuration parameters:</h4> <p><code>storage.binary.provider</code></p> <p>(Required, String) <strong>Must</strong> be <code>sftp</code>.</p> <p><code>storage.binary.dataPath</code></p> <p>(Optional, String) The path where the data will be stored. Default is <code>/binary</code>.</p> <p><code>storage.binary.server</code></p> <p>(Optional, String) The URI to the SFTP server. Default is <code>sftp://pdp:pdp@localhost:22</code>.</p> <p><code>storage.binary.connection.keepAliveInterval</code></p> <p>(Optional, Duration) The interval to send a keep alive signal to the server. Default is <code>30s</code>.</p> <p><code>storage.binary.connection.connectTimeout</code></p> <p>(Optional, Duration) The timeout for the connection to the server. Default is <code>5s</code>.</p> <p><code>storage.binary.connection.readTimeout</code></p> <p>(Optional, Duration) The timeout for the reading from the server. Default is <code>3s</code>.</p> <h1>Cache Manager</h1> <p>The local cache is an extension to the one provided by Micronaut and implemented with <a href="https://github.com/ben-manes/caffeine">Caffeine</a>. It simplifies the process of caching entities with the use of annotations and configurations in the <code>application.yml</code></p> <h2>Configuration</h2> <pre><code class="language-yaml">storage: cache: local: myEntity: initialCapacity: 10 maximumSize: 50 maximumWeight: 100 expireAfterWrite: 1h expireAfterAccess: 5m recordStats: true testMode: false </code></pre> <p>For a cache named <code>myEntity</code> with the following properties:</p> <p>| Property | Type | Default | Description | | ----------------- | :------: | :-----: | ----------- | | initialCapacity | Integer | 16 | The minimum size of the cache | | maximumSize | Long | | The maximum size of the cache. <strong>Can not be combined with a weigher</strong> | | maximumWeight | Long | | The maximum weight to be allowed for an element in the cache (see <a href="#weigher">Weigher</a> section) | | expireAfterWrite | Duration | | The time to wait to expire an element after its creation | | expireAfterAccess | Duration | 5m | The time to wait to expire an element after the last time it was accessed | | recordStats | boolean | true | To record statistics about hit rates and evictions (see <a href="#cache-statistics">Cache Statistics</a> section) | | testMode | boolean | false | To execute all cache operations in a single thread |</p> <blockquote> <p>Each cache <strong>must</strong> have a unique name, which will be automatically normalized to <em>kebab-case</em> (i.e. <code>myEntity</code> becomes <code>my-entity</code>)</p> </blockquote> <p>A default configuration for a cache can be defined as a bean:</p> <pre><code class="language-java">@Factory @Requires(missingProperty = LocalCacheProperties.PREFIX + &quot;.&quot; + MyEntityCacheConfig.CACHE_NAME) public class MyEntityCacheConfig { public static final String CACHE_NAME = &quot;my-entity&quot;; @Bean @Named(CACHE_NAME) LocalCacheProperties cacheProperties(ApplicationConfiguration applicationConfiguration) { return LocalCacheProperties.builder() .cacheName(CACHE_NAME) .applicationConfiguration(applicationConfiguration) .build(); } } </code></pre> <h3>Weigher</h3> <p>A weigher determines if an element becomes too <em>heavy</em> to be in the cache. If for a cache's <code>maximumWeight</code> there is not a corresponding <strong>named</strong> weigher, any other weigher will be selected. If there is no registered weigher, a default weigher where every element has a weight of 1 will be used:</p> <pre><code class="language-java">import com.github.benmanes.caffeine.cache.*; @Singleton @Named(MyEntityCacheConfig.CACHE_NAME) public class MyEntityCacheWeigher implements Weigher&lt;UUID, MyEntity&gt; { @Override public @NonNegative int weigh(@NonNull UUID key, @NonNull MyEntity value) { return 0; } } </code></pre> <h3>Removal Listener</h3> <p>A listener that triggers every time an element is evicted:</p> <pre><code class="language-java">import com.github.benmanes.caffeine.cache.*; @Singleton @Named(MyEntityCacheConfig.CACHE_NAME) public class MyEntityCacheRemovalListener implements RemovalListener&lt;UUID, MyEntity&gt; { @Override public void onRemoval(@Nullable UUID key, @Nullable MyEntity value, @NonNull RemovalCause cause) { // Do something with the event } } </code></pre> <h2>Annotation-based caching</h2> <p>Any class (POJOs, Connections, Factories...) can be stored in the cache. For example, instances of the following entity type:</p> <pre><code class="language-java">@Data public class MyEntity implements CoreEntity&lt;UUID&gt; { private UUID id; private String name; ... } </code></pre> <p>can be cached in any managed bean with the use of <code>io.micronaut.cache.annotation</code> annotations:</p> <ul> <li><code>@Cacheable</code></li> <li><code>@CachePut</code></li> <li><code>@CacheInvalidate</code></li> </ul> <pre><code class="language-java">@Singleton @CacheConfig(MyEntityCacheConfig.CACHE_NAME) public class MyEntityService { @Inject protected MyEntityRepository myEntityRepository; @Cacheable(keyGenerator = CoreEntityKeyGenerator.class) public List&lt;MyEntity&gt; getAll() { return myEntityRepository.getAll(); } @Cacheable public MyEntity getOne(UUID id) { return myEntityRepository.getOne(id); } @CachePut(keyGenerator = CoreEntityKeyGenerator.class) public void store(MyEntity myEntity) { myEntityRepository.store(myEntity); } @CacheInvalidate public MyEntity delete(UUID id) { return myEntityRepository.delete(id); } } </code></pre> <blockquote> <p>The key for the cacheable object <strong>must</strong> implement <code>equals()</code> and <code>hashCode()</code></p> </blockquote> <p>Note that for the <code>getAll()</code> and <code>store(MyEntity)</code> methods, a custom key generator needs to be specified. This way the cache will calculate the appropriate <code>key</code> for each entity. If no generator is defined, the <code>DefaultCacheKeyGenerator</code> is used.</p> <blockquote> <p>The <code>CoreEntityKeyGenerator</code> can be used with any entity that implements <code>CoreEntity&lt;T&gt;</code></p> </blockquote> <p>Multiple caches can be configured in the same <code>@CacheConfig</code>, in which case, the name of the used cache <strong>must</strong> be specified. Likewise, the key for the cached value can be a composite of multiple objects (internally wrapped as a <code>ParametersKey</code> and generated by a <code>DefaultCacheKeyGenerator</code>):</p> <pre><code class="language-java">@Singleton @CacheConfig({ &quot;cacheA&quot;, &quot;cacheB&quot; }) public class MyMultiCacheService { @Cacheable(&quot;cacheA&quot;) public MyEntity getOneA(UUID id) { ... } @Cacheable(&quot;cacheB&quot;) public MyEntity getOneB(UUID id, UUID parentId) { ... } } </code></pre> <h2>Cache Statistics</h2> <p>If the cache statistics are enabled, they will be published as part of the application <a href="#metrics">metrics</a>:</p> <ul> <li><code>cache.eviction.weight</code> - the sum of weights of evicted entries</li> <li><code>cache.evictions</code> - the count of cache evictions</li> <li><code>cache.size</code> - the <strong>estimated</strong> number of entries in the cache</li> <li><code>cache.gets</code> - the number of times a cache-annotated method has returned an item (regardless if it was cached or not). This metric can be refined with the use of tags: <ul> <li><code>result:hit</code> - the number of times cache lookup methods have returned a cached value</li> <li><code>result:miss</code> - the number of times cache lookup methods have returned an uncached value</li> </ul> </li> </ul> <blockquote> <p>If the application has multiple caches, the metrics can be filtered with the <code>cache:my-entity</code> tag</p> </blockquote> <h1>Collections</h1> <h2>Flushable Collection</h2> <p>A data structure that asynchronously flushes its content any time a preconfigured criteria is met. It is backed up by an <code>ArrayList&lt;T&gt;</code> and it is guaranteed to be <em>thread-safe</em>.</p> <h3>Configuration</h3> <h4>Basic Properties</h4> <p>| Property | Type | Default | Description | | ------------ | :------: | :-----: | ----------- | | maxCount | Integer | | The maximum number of elements before flushing. Triggers a <a href="#flush-events">Count</a> flush | | maxDataSize | Long | | The maximum size of the collection elements before flushing. Triggers a <a href="#flush-events">Data Size</a> flush | | flushAfter | Duration | | The duration before flushing. Triggers a <a href="#flush-events">Scheduled</a> flush | | threads | Integer | 5 | The number of threads used to execute the flush event | | flushTimeout | Duration | 10m | The timeout for the flush event |</p> <h5>Properties Template</h5> <p>For a collection of type <code>my-collection</code>, a set of default properties can be defined as a bean or in the <code>application.yml</code>:</p> <pre><code class="language-yaml">collections: flushable: myCollection: maxCount: 10 maxDataSize: 1mb flushAfter: 5m threads: 10 flushTimeout: 10m </code></pre> <p>The same configuration can be defined as:</p> <pre><code class="language-java">@Factory @Requires(missingProperty = FlushableCollectionProperties.PREFIX + &quot;.&quot; + MyFlushableCollectionConfig.TYPE) public class MyFlushableCollectionConfig { public static final String TYPE = &quot;my-collection&quot;; @Bean @Named(TYPE) FlushableCollectionProperties collectionProperties() { return FlushableCollectionProperties.builder() .type(TYPE) .maxCount(10) .maxDataSize(DataSize.ofMegabytes(1).asBytes()) .flushAfter(Duration.ofMinutes(5)) .threads(10) .flushTimeout(Duration.ofMinutes(10)) .build(); } } </code></pre> <blockquote> <p>Each collection definition <strong>must</strong> have a unique name, which will be automatically normalized to <em>kebab-case</em> (i.e. <code>myCollection</code> becomes <code>my-collection</code>)</p> </blockquote> <h4>Action Handlers</h4> <h5>Flush Handler</h5> <p>Consumer to be called when the flush event is triggered. For instance, a flush handler for a collection of integer elements can be defined as:</p> <pre><code class="language-java">public class MyCollectionFlushHandler implements Consumer&lt;FlushableCollection.Batch&lt;Integer&gt;&gt; { @Override public void accept(FlushableCollection.Batch&lt;Integer&gt; batch) { // Do something with the batch } } </code></pre> <p>By default, does nothing: <code>builder.flushHandler(batch -&gt; {})</code></p> <h5>Weigher</h5> <p>Function to determine the size of an element. Required to trigger a <a href="#data-size-flush">Data Size</a> flush. For instance, a collection of integer elements can define its weigher as:</p> <pre><code class="language-java">public class MyCollectionWeigher implements Function&lt;Integer, Long&gt; { @Override public Long apply(Integer element) { return element.toString().length(); } } </code></pre> <p>By default, the weight is calculated after converting the element to String and counting its number of bytes using <em>UTF-8</em> encoding.</p> <h5>Success Handler</h5> <p>Consumer to be executed if the <a href="#flush-handler">flush handler</a> was successfully executed. For instance, a success handler for a collection of integer elements can be defined as:</p> <pre><code class="language-java">public class MyCollectionSuccessHandler implements Consumer&lt;FlushableCollection.Batch&lt;Integer&gt;&gt; { @Override public void accept(FlushableCollection.Batch&lt;Integer&gt; batch) { // Do something with the successful batch } } </code></pre> <p>By default, logs a debug message with the details of the processed batch: <code>builder.successHandler(batch -&gt; log.debug(...))</code></p> <h5>Failure Handler</h5> <p>BiConsumer to be executed if the <a href="#flush-handler">flush handler</a> failed. For instance, a failure handler for a collection of integer elements can be defined as:</p> <pre><code class="language-java">public class MyCollectionFailureHandler implements Consumer&lt;FlushableCollection.Batch&lt;Integer&gt;, Throwable&gt; { @Override public void accept(FlushableCollection.Batch&lt;Integer&gt; batch, Throwable ex) { // Do something with the failed batch } } </code></pre> <p>By default, logs an error message with the details of the batch and its exception: <code>builder.failureHandler((batch, ex) -&gt; log.error(...))</code>. If the flush event causes a timeout, the input <code>Throwable</code> will be of type <code>java.util.concurrent.TimeoutException</code>.</p> <h3>Usage</h3> <p>The collection <strong>must</strong> be created through the <code>FlushableCollectionFactory</code> bean, by providing the expected type. If the application context finds a <code>FlushableCollectionProperties</code> with the same name, it will be used as template for the new collection. Otherwise, a new properties set with default values will be created. Note that any pre-defined property can be overridden during the <em>build</em> phase.</p> <pre><code class="language-java">@Inject protected FlushableCollectionFactory flushableCollectionFactory; void submit() { try (var collection = flushableCollectionFactory.&lt;Integer&gt;builder(&quot;my-collection&quot;) .flushHandler(new MyCollectionFlushHandler()) .weigher(new MyCollectionWeigher()) .successHandler(new MyCollectionSuccessHandler()) .failureHandler(new MyCollectionFailureHandler()) .build()) { for (int i = 0; i &lt; 10; i++) { collection.add(i); } } } </code></pre> <h3>Flush Events</h3> <ul> <li><code>COUNT</code> - Triggers if the collection contains more elements than the value defined in the <code>maxCount</code> property. If undefined, it will never be triggered.</li> <li><code>DATA_SIZE</code> - Triggers if the size of the elements in the collection is greater than the value defined in the <code>maxDataSize</code> property. If undefined, it will never be triggered.</li> <li><code>SCHEDULED</code> - Triggers based on the schedule defined with the <code>flushAfter</code> property. If undefined, it will never be triggered.</li> <li><code>MANUAL</code> - Triggers whenever the <code>collection.flush()</code> method is called.</li> <li><code>CLOSE</code> - Triggers whenever the collection is closed by either using a <code>try</code> with resources, or by calling the <code>collection.close()</code> method.</li> </ul> <h3>Flush Metrics</h3> <p>Each collection will publish metrics about the duration of each flush event, its size, and the count of success/failures (see <a href="#metrics">Metrics</a> section).</p> <p>This can be refined with the use of custom tags during the creation of the collection:</p> <pre><code class="language-java">try (var collection = flushableCollectionFactory.&lt;Integer&gt;builder(&quot;my-collection&quot;) .tag(&quot;key&quot;, &quot;value&quot;) .build()) { // Do something with the collection } </code></pre> <p>The metrics will then be available in:</p> <ul> <li><code>GET /metrics/pdp.collections.flushable.[type]</code> - The count of successful and failed flush events.</li> <li><code>GET /metrics/pdp.collections.flushable.[type].duration</code> - The duration for the flush handler.</li> <li><code>GET /metrics/pdp.collections.flushable.[type].size</code> - The size of the flushed elements.</li> </ul> <h1>DSL</h1> <p>The PDP DSL is an abstract definition for a common language to be used in any PDP product. Its intention is to have a standardized way to express configurations that might be interpreted in a different way according to the needs of the product itself.</p> <h2>Filters</h2> <p>A filter is a criteria to be applied to a given object. In order to use it, the <code>FilterAdapter</code> interface needs to be implemented. The core supports the following concrete adapters:</p> <ul> <li><code>MapFilterAdapter</code> - Converts the filter into a predicate used to evaluate <code>Map&lt;String, Object&gt;</code> structures. The expected field name is the key of the map.</li> <li><code>JsonPathFilterAdapter</code> - Converts the filter into a predicate used to evaluate <code>DocumentContext</code> structures. The expected field name is a <a href="https://goessner.net/articles/JsonPath">JSON Path</a> to be found in the JSON document.</li> <li><code>JsonPointerFilterAdapter</code> - Converts the filter into a predicate used to evaluate <code>ObjectNode</code> structures. The expected field name is a <a href="https://www.rfc-editor.org/rfc/rfc6901">JSON Pointer</a> to be found in the JSON document.</li> </ul> <blockquote> <p>All filters have an optional <code>source</code> field that could be used by the concrete implementation to select among multiple data structures</p> </blockquote> <h3>&quot;Equals&quot; Filter</h3> <p>The value of the field must be exactly as the one provided.</p> <pre><code class="language-java">var filter = EqualsFilter.builder().field(&quot;field&quot;).value(&quot;value&quot;).build(); </code></pre> <pre><code class="language-json">{ &quot;equals&quot;: { &quot;field&quot;: &quot;field&quot;, &quot;value&quot;: &quot;value&quot; } } </code></pre> <h3>&quot;Greater Than&quot; Filter</h3> <p>The value of the field must be greater than the one provided.</p> <pre><code class="language-java">var filter = GreaterThanFilter.builder().field(&quot;field&quot;).value(1).build(); </code></pre> <pre><code class="language-json">{ &quot;gt&quot;: { &quot;field&quot;: &quot;field&quot;, &quot;value&quot;: 1 } } </code></pre> <h3>&quot;Greater Than or Equals&quot; Filter</h3> <p>The value of the field must be greater than or equals to the one provided.</p> <pre><code class="language-java">var filter = GreaterThanOrEqualsFilter.builder().field(&quot;field&quot;).value(1).build(); </code></pre> <pre><code class="language-json">{ &quot;gte&quot;: { &quot;field&quot;: &quot;field&quot;, &quot;value&quot;: 1 } } </code></pre> <h3>&quot;Less Than&quot; Filter</h3> <p>The value of the field must be less than the one provided.</p> <pre><code class="language-java">var filter = LessThanFilter.builder().field(&quot;field&quot;).value(1).build(); </code></pre> <pre><code class="language-json">{ &quot;lt&quot;: { &quot;field&quot;: &quot;field&quot;, &quot;value&quot;: 1 } } </code></pre> <h3>&quot;Less Than or Equals&quot; Filter</h3> <p>The value of the field must be less than or equals to the one provided.</p> <pre><code class="language-java">var filter = LessThanOrEqualsFilter.builder().field(&quot;field&quot;).value(1).build(); </code></pre> <pre><code class="language-json">{ &quot;lte&quot;: { &quot;field&quot;: &quot;field&quot;, &quot;value&quot;: 1 } } </code></pre> <h3>&quot;In&quot; Filter</h3> <p>The value of the field must be one of the provided values.</p> <pre><code class="language-java">var filter = InFilter.builder().field(&quot;field&quot;).value(&quot;valueA&quot;).value(&quot;valueB&quot;).build(); </code></pre> <pre><code class="language-json">{ &quot;in&quot;: { &quot;field&quot;: &quot;field&quot;, &quot;values&quot;: [ &quot;valueA&quot;, &quot;valueB&quot; ] } } </code></pre> <h3>&quot;Empty&quot; Filter</h3> <p>Checks if a field is empty:</p> <ul> <li>For a collection, <code>true</code> if its size is 0</li> <li>For a String, <code>true</code> if its length is 0</li> <li>For any other type, <code>true</code> if it is <code>null</code></li> </ul> <pre><code class="language-java">var filter = EmptyFilter.builder() .field(&quot;field&quot;) .build(); </code></pre> <pre><code class="language-json">{ &quot;empty&quot;: { &quot;field&quot;: &quot;field&quot; } } </code></pre> <h3>&quot;Exists&quot; Filter</h3> <p>Checks if a field exists.</p> <pre><code class="language-java">var filter = ExistsFilter.builder() .field(&quot;field&quot;) .build(); </code></pre> <pre><code class="language-json">{ &quot;exists&quot;: { &quot;field&quot;: &quot;field&quot; } } </code></pre> <h3>&quot;Not&quot; Filter</h3> <p>Negates the inner clause.</p> <pre><code class="language-java">var filter = NotFilter.builder() .clause(EqualsFilter.builder().field(&quot;field&quot;).value(&quot;value&quot;).build()) .build(); </code></pre> <pre><code class="language-json">{ &quot;not&quot;: { &quot;equals&quot;: { &quot;field&quot;: &quot;field&quot;, &quot;value&quot;: &quot;value&quot; } } } </code></pre> <h3>&quot;Null&quot; Filter</h3> <p>Checks if a field is null. Note that while the &quot;exists&quot; filter checks whether the field is present or not, the &quot;null&quot; filter expects the field to be present but with <code>null</code> value.</p> <pre><code class="language-java">var filter = NullFilter.builder() .field(&quot;field&quot;) .build(); </code></pre> <pre><code class="language-json">{ &quot;null&quot;: { &quot;field&quot;: &quot;field&quot; } } </code></pre> <h3>Boolean Operators</h3> <h4>&quot;And&quot; Filter</h4> <p>All conditions in the list <strong>must</strong> be evaluated to <code>true</code>.</p> <pre><code class="language-java">var filter = AndFilter.builder() .clause(EqualsFilter.builder().field(&quot;fieldA&quot;).value(&quot;valueA&quot;).build()) .clause(EqualsFilter.builder().field(&quot;fieldB&quot;).value(&quot;valueB&quot;).build()) .build(); </code></pre> <pre><code class="language-json">{ &quot;and&quot;: [ { &quot;equals&quot;: { &quot;field&quot;: &quot;fieldA&quot;, &quot;value&quot;: &quot;valueA&quot; } }, { &quot;equals&quot;: { &quot;field&quot;: &quot;fieldB&quot;, &quot;value&quot;: &quot;valueB&quot; } } ] } </code></pre> <h4>&quot;Or&quot; Filter</h4> <p>At least one condition in the list <strong>must</strong> be evaluated to <code>true</code>.</p> <pre><code class="language-java">var filter = OrFilter.builder() .clause(EqualsFilter.builder().field(&quot;fieldA&quot;).value(&quot;valueA&quot;).build()) .clause(EqualsFilter.builder().field(&quot;fieldB&quot;).value(&quot;valueB&quot;).build()) .build(); </code></pre> <pre><code class="language-json">{ &quot;or&quot;: [ { &quot;equals&quot;: { &quot;field&quot;: &quot;fieldA&quot;, &quot;value&quot;: &quot;valueA&quot; } }, { &quot;equals&quot;: { &quot;field&quot;: &quot;fieldB&quot;, &quot;value&quot;: &quot;valueB&quot; } } ] } </code></pre> <h1>HTTP Base Client</h1> <p>The HTTP Base Client is an abstraction of the <a href="https://square.github.io/okhttp/4.x/okhttp/okhttp3/-ok-http-client/">OkHttpClient</a>. It provides features such as retry handlers and automated paginated requests.</p> <p>The minimum initialization for the client defines the configuration of the HTTP connections and other shared properties to be used by the endpoints:</p> <pre><code class="language-java">public class MyClient extends HttpBaseClient { protected MyClient( @NonNull OkHttpClient client, @NonNull ObjectMapper objectMapper, @NonNull BackoffPolicyProperties backoffPolicy, MeterRegistry meterRegistry, @NonNull ScrollProperties scroll) { super(client, objectMapper, backoffPolicy, scroll); } @Override public boolean ping() { return true; } public static Builder builder() { return new Builder(); } public static final class Builder extends HttpBaseClientBuilder&lt;Builder, MyClient&gt; { @Override public MyClient build() { return new MyClient( newClientBuilder().build(), getObjectMapper(), getBackoffPolicy(), getScroll()); } } } </code></pre> <p>The builder can be also initialized with a <code>HttpBaseClientProperties</code> instance:</p> <pre><code class="language-java">public class MyClient extends HttpBaseClient { ... public static Builder builder(HttpBaseClientProperties clientProperties) { return new Builder(clientProperties); } @NoArgsConstructor public static final class Builder extends HttpBaseClientBuilder&lt;Builder, MyClient&gt; { protected Builder(HttpBaseClientProperties clientProperties) { super(clientProperties); } ... } } </code></pre> <p>If some configurations are not desired on the implemented client, they can be disabled in the builder:</p> <pre><code class="language-java">public class MyClient extends HttpBaseClient { ... @HttpBaseClientBuilder.DisabledConfig({ HttpBaseClientBuilder.Config.COMPRESS_REQUESTS, HttpBaseClientBuilder.Config.FOLLOW_REDIRECTS, HttpBaseClientBuilder.Config.SCROLL }) public static final class Builder extends HttpBaseClientBuilder&lt;Builder, MyClient&gt; { ... } } </code></pre> <p>any attempt to set a disabled configuration will throw an <code>UnsupportedOperationException</code>.</p> <p>By default, the retry of the <code>executeWithBackoff(...)</code> method will happen if the HTTP response code is:</p> <ul> <li>408 - Request Time Out</li> <li>429 - Too Many Request</li> <li>500 - Internal Server Error</li> <li>504 - Gateway Timeout</li> </ul> <p>The condition can be changed by sending a different predicate to the method:</p> <pre><code class="language-java">client.executeWithBackoff(requestDetails, response -&gt; false); </code></pre> <p>In this case, the request will not be retried because of a status code (although it might be retried because of an exception during the execution).</p> <h2>HTTP Round Robin Client</h2> <p>The HTTP Round Robin Client is a specialization of the <code>HttpBaseClient</code> backed up by a round-robin collection. Provides the corresponding classes as described in the previous section: <code>HttpRoundRobinClient</code>, <code>HttpRoundRobinClientBuilder</code>, and <code>HttpRoundRobinClientProperties</code>.</p> <h2>HTTP Client Request</h2> <p>The request object is a simple POJO with the properties needed to execute the HTTP call:</p> <pre><code class="language-java">@RequiredArgsConstructor public class MyRequest implements HttpClientRequest { private final String id; } </code></pre> <blockquote> <p>For cases when no data is needed in the request, an empty implementation is avaliable in <code>HttpRoundRobinClient#EMPTY_REQUEST</code></p> </blockquote> <h3>HTTP Client Pageable Request</h3> <p>The <code>HttpClientPageableRequest</code> interface is an extension of <code>HttpClientRequest</code> and defines default methods to handle pagination parameters.</p> <h2>HTTP Client Response</h2> <h3>Single Response</h3> <p>Expected when the response is a single payload.</p> <pre><code class="language-java">public class MySinglePayloadResponse extends AbstractSingleResponse&lt;MyRequest&gt; { public MySinglePayloadResponse( ObjectMapper objectMapper, HttpClientRequestDetails&lt;MyRequest&gt; requestDetails, Response httpResponse ) { super(objectMapper, requestDetails, httpResponse); } @Override protected void handleSuccess(Response httpResponse) { // Do something with the response } } </code></pre> <p>With this definition, the endpoint can be exposed in the client:</p> <pre><code class="language-java">public class MyClient extends HttpRoundRobinClient { ... public MySinglePayloadResponse get(MyRequest clientRequest) { var url = serverHosts.next() .newBuilder() .addPathSegment(&quot;content&quot;) .addPathSegment(request.getId()) .build(); var requestDetails = new HttpClientRequestDetails&lt;&gt;( new Request.Builder().url(url).get().build(), clientRequest); return new MySinglePayloadResponse( objectMapper, requestDetails, executeWithBackoff(requestDetails) ); } ... } </code></pre> <p>Both <code>handleSuccess(Response)</code> and <code>handleFailure(Response)</code> have a default implementation that can be overridden. A response is considered a success if its HTTP status code is in the 200 to 299 range and its body is not <code>null</code> (not to be confused with an empty body). This check can be changed in the constructor:</p> <pre><code class="language-java">public class MySinglePayloadResponse extends AbstractSingleResponse&lt;MyRequest&gt; { public MySinglePayloadResponse( ObjectMapper objectMapper, HttpClientRequestDetails&lt;MyRequest&gt; requestDetails, Response httpResponse ) { super(objectMapper, requestDetails, httpResponse, r -&gt; r.code() == 400); } } </code></pre> <p>If the response was a failure, but it had a payload, it can be retrieved with <code>AbstractSingleResponse#getErrorMessage()</code>, or <code>AbstractSingleResponse#getErrorMessage(Class)</code> (to convert the payload into the given class type).</p> <h3>Iterable Response</h3> <p>Expected when the response is a full collection of iterable elements with no pagination. It is assumed that the payload of the response is a JSON array (if that's not the case, the <code>handleSuccess(Response httpResponse)</code> method <strong>must</strong> be overridden).</p> <pre><code class="language-java">public class MyIterableResponse extends AbstractIterableResponse&lt;MyRequest, String&gt; { public MyIterableResponse( ObjectMapper objectMapper, HttpClientRequestDetails&lt;MyRequest&gt; requestDetails, Response httpResponse ) { super(objectMapper, requestDetails, httpResponse); } @Override protected String parseElement(JsonNode jsonNode) { return jsonNode.asText(); } } </code></pre> <p>With this definition, the endpoint can be exposed in the client:</p> <pre><code class="language-java">public class MyClient extends HttpRoundRobinClient { ... public MyIterableResponse list(MyRequest clientRequest) { var url = serverHosts.next() .newBuilder() .addPathSegment(&quot;list&quot;) .addPathSegment(request.getId()) .build(); var requestDetails = new HttpClientRequestDetails&lt;&gt;( new Request.Builder().url(url).get().build(), clientRequest); return new MyIterableResponse( objectMapper, requestDetails, executeWithBackoff(requestDetails) ); } ... } </code></pre> <p>Each element in the response can then be retrieved by using the <code>Iterable&lt;&gt;</code> interface.</p> <p>The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.</p> <h3>Paginated Response</h3> <h4>Token Page</h4> <p>Expected when the response is a consecutive list of pages obtained with a token.</p> <h5>Token Page Response</h5> <p>The response corresponds to each page from the pagination requests. Since it is not possible to automatically determine where is the token for the next page, the <code>AbstractTokenPageResponse#handleSuccess(Response)</code> method <strong>must</strong> be implemented.</p> <pre><code class="language-java">public class MyTokenPageResponse extends AbstractTokenPageResponse&lt;MyRequest, String&gt; { public MyTokenPageResponse( ObjectMapper objectMapper, HttpClientRequestDetails&lt;MyRequest&gt; requestDetails, Response httpResponse, String currentToken ) { super(objectMapper, requestDetails, httpResponse, currentToken); } @Override protected void handleSuccess(Response response) { try { var json = objectMapper.readTree(Objects.requireNonNull(response.body()).byteStream()); this.nextPageReference = json.get(&quot;token&quot;).asText(); if (json.has(&quot;content&quot;)) { this.elements = new JsonLazyIterator&lt;&gt;(json.get(&quot;content&quot;), this::parseElement); } else { this.elements = Collections.emptyIterator(); } } catch (Exception ex) { throw new DataException(ex.getMessage(), ex); } } @Override protected String parseElement(JsonNode jsonNode) { return jsonNode.asText(); } } </code></pre> <p>The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.</p> <h5>Token Page Collection</h5> <p>The response corresponds to a collection of all the page responses that iterates through the data.</p> <pre><code class="language-java">public class MyTokenPageCollectionResponse extends AbstractTokenCollectionResponse&lt;MyRequest, String, MyTokenPageResponse&gt; { public MyTokenPageCollectionResponse(Function&lt;String, MyTokenPageResponse&gt; pageFunction) { super(pageFunction); } } </code></pre> <p>With this definition, the endpoint can be exposed in the client:</p> <pre><code class="language-java">public class MyClient extends HttpRoundRobinClient { ... public MyTokenPageCollectionResponse paginatedByToken(MyRequest clientRequest) { return new MyTokenPageCollectionResponse( token -&gt; { var urlBuilder = serverHosts.next() .newBuilder() .addPathSegment(&quot;list&quot;) .addPathSegment(request.getId()) .addQueryParameter(&quot;limit&quot;, String.valueOf(scroll.getSize())); if (token != null) { urlBuilder.addQueryParameter(&quot;token&quot;, token); } var requestDetails = new HttpClientRequestDetails&lt;&gt;( new Request.Builder().url(urlBuilder.build()).get().build(), clientRequest); return new MyTokenPageResponse( objectMapper, requestDetails, executeWithBackoff(requestDetails), token ); } ); } ... } </code></pre> <p>Note that in case of a <code>null</code> token, the function <strong>must</strong> return the first page.</p> <p>Each element in the response can then be retrieved by using the <code>Iterable&lt;&gt;</code> interface. The pagination will be handled behind the scenes.</p> <h4>Offset Page</h4> <p>Expected when the response is a consecutive list of pages obtained with a page number.</p> <h5>Offset Page Response</h5> <p>The response corresponds to each page from the pagination requests. It is assumed that the payload of the response is a JSON array (if that's not the case, the <code>handleSuccess(Response httpResponse)</code> method <strong>must</strong> be overridden).</p> <p>It is also assumed that the <code>AbstractOffsetPageResponse#getNextPageReference()</code> is always 1 after the current page. This method can be overridden for custom behaviors.</p> <pre><code class="language-java">public class MyOffsetPageResponse extends AbstractOffsetPageResponse&lt;MyRequest, String&gt; { public MyOffsetPageResponse( ObjectMapper objectMapper, HttpClientRequestDetails&lt;MyRequest&gt; requestDetails, Response httpResponse, Long currentOffset ) { super(objectMapper, requestDetails, httpResponse, currentOffset); } @Override protected String parseElement(JsonNode jsonNode) { return jsonNode.asText(); } } </code></pre> <p>The predicate to determine if a response was successful can be provided in the constructor, the same way as in the previous sections.</p> <h5>Pageable Page Response</h5> <p>The response is a specialization of the <em>Offset Page Response</em>, and it is based on a default page structure:</p> <pre><code class="language-json">{ &quot;totalPages&quot;: 0, &quot;totalElements&quot;: 0, &quot;size&quot;: 0, &quot;content&quot;: [ {} ], &quot;number&quot;: 0, &quot;first&quot;: true, &quot;last&quot;: true, &quot;sort&quot;: { &quot;sorted&quot;: true, &quot;unsorted&quot;: true, &quot;empty&quot;: true }, &quot;numberOfElements&quot;: 0, &quot;pageable&quot;: { &quot;page&quot;: 0, &quot;size&quot;: 0, &quot;sort&quot;: [ &quot;string&quot; ] }, &quot;empty&quot;: true } </code></pre> <h5>Offset Page Collection</h5> <p>The response corresponds to a collection of all the page responses that iterates through the data.</p> <pre><code class="language-java">public class MyOffsetPageCollectionResponse extends AbstractOffsetCollectionResponse&lt;MyRequest, String, MyOffsetPageResponse&gt; { public MyOffsetPageCollectionResponse(Function&lt;String, MyOffsetPageResponse&gt; pageFunction) { super(pageFunction); } } </code></pre> <p>With this definition, the endpoint can be exposed in the client:</p> <pre><code class="language-java">public class MyClient extends HttpRoundRobinClient { ... public MyOffsetPageCollectionResponse paginatedByOffset(MyRequest clientRequest) { return new MyOffsetPageCollectionResponse( offset -&gt; { var urlBuilder = serverHosts.next() .newBuilder() .addPathSegment(&quot;list&quot;) .addPathSegment(request.getId()) .addQueryParameter(&quot;limit&quot;, String.valueOf(scroll.getSize())); if (token != null) { urlBuilder.addQueryParameter(&quot;page&quot;, String.valueOf(offset)); } var requestDetails = new HttpClientRequestDetails&lt;&gt;( new Request.Builder().url(urlBuilder.build()).get().build(), clientRequest); return new MyOffsetPageResponse( objectMapper, requestDetails, executeWithBackoff(requestDetails), offset ); } ); } ... } </code></pre> <p>Note that in case of a <code>null</code> token, the function <strong>must</strong> return the first page.</p> <p>Each element in the response can then be retrieved by using the <code>Iterable&lt;&gt;</code> interface. The pagination will be handled behind the scenes.</p> <h1>JSON</h1> <p>JSON handling is done with <a href="https://github.com/FasterXML/jackson">Jackson Project</a>, and the default configuration for the mapper can be instantiated with <code>com.pureinsights.pdp.core.config.CoreSerializer.newInstance()</code>.</p> <h2>Default Object Mapper</h2> <h3>Registered Modules</h3> <ul> <li><code>JavaTimeModule</code> to handle the <a href="https://jcp.org/en/jsr/detail?id=310">JSR-310: Date and Time API</a></li> </ul> <h3>Registered Deserializers</h3> <ul> <li><code>com.pureinsights.pdp.core.time.CoreDurationDeserializer</code> as an extension of the default <code>DurationDeserializer</code> to allow values in human-readable format (i.e. &quot;1s&quot;, &quot;12m&quot;, &quot;5h&quot;...). <strong>If no suffix is provided, milliseconds are assumed</strong>. The supported suffixes are: <ul> <li><code>d</code> - days</li> <li><code>h</code> - hours</li> <li><code>m</code> - minutes</li> <li><code>s</code> - seconds</li> <li><code>ms</code> - milliseconds</li> <li><code>ns</code> - nanoseconds</li> </ul> </li> </ul> <h3>Enabled Features</h3> <ul> <li><code>MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS</code></li> <li><code>MapperFeature.ACCEPT_CASE_INSENSITIVE_VALUES</code></li> <li><code>DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY</code></li> </ul> <h3>Disabled Features</h3> <ul> <li><code>SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS</code></li> <li><code>SerializationFeature.WRITE_DATES_AS_TIMESTAMPS</code></li> <li><code>DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES</code></li> <li><code>DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE</code></li> </ul> <h1>Logging Handler</h1> <p>The logging handler simplifies the process of log analysis by using a JSON output with support for custom POJOs.</p> <h2>Logback Configuration</h2> <pre><code class="language-xml">&lt;configuration&gt; &lt;appender name=&quot;STDOUT&quot; class=&quot;ch.qos.logback.core.ConsoleAppender&quot;&gt; &lt;encoder class=&quot;net.logstash.logback.encoder.LogstashEncoder&quot;&gt; &lt;provider class=&quot;com.pureinsights.pdp.core.logging.CoreArgumentsProvider&quot;/&gt; &lt;/encoder&gt; &lt;/appender&gt; &lt;root level=&quot;info&quot;&gt; &lt;appender-ref ref=&quot;STDOUT&quot;/&gt; &lt;/root&gt; &lt;/configuration&gt; </code></pre> <h2>Custom POJO</h2> <pre><code class="language-java">@Getter public class MyPOJO { private final UUID id = UUID.randomUUID(); } </code></pre> <pre><code class="language-java">public class MyPOJOLoggingHandler implements PojoLoggingHandler&lt;MyPOJO&gt; { private static final String PARENT_LABEL = &quot;myPOJO&quot;; private static final String ID_LABEL = &quot;id&quot;; @Override public Class&lt;MyPOJO&gt; getSupportedType() { return MyPOJO.class; } @Override public void write(JsonGenerator generator, MyPOJO pojo) throws IOException { generator.writeFieldName(PARENT_LABEL); generator.writeStartObject(); generator.writeObjectField(ID_LABEL, pojo.getId()); generator.writeEndObject(); } } </code></pre> <p>In order to invoke the handler, the class type can call the logging framework:</p> <pre><code class="language-java">log.info(&quot;Message&quot;, new MyPOJO()); log.info(&quot;Message {}&quot;, new MyPOJO()); log.info(&quot;Message {}&quot;, &quot;with placeholder&quot;, new MyPOJO()); </code></pre> <pre><code class="language-json">{&quot;@timestamp&quot;:&quot;2021-12-08T15:07:11.379Z&quot;,&quot;@version&quot;:1,&quot;message&quot;:&quot;Message&quot;,&quot;logger_name&quot;:&quot;LoggerTest&quot;,&quot;thread_name&quot;:&quot;Test worker&quot;,&quot;level&quot;:&quot;INFO&quot;,&quot;level_value&quot;:20000,&quot;myPOJO&quot;:{&quot;id&quot;:&quot;707ea126-d94b-47b6-9010-72a3acc8a786&quot;}} {&quot;@timestamp&quot;:&quot;2021-12-08T15:07:11.380Z&quot;,&quot;@version&quot;:1,&quot;message&quot;:&quot;Message MyPOJO$373141eb&quot;,&quot;logger_name&quot;:&quot;LoggerTest&quot;,&quot;thread_name&quot;:&quot;Test worker&quot;,&quot;level&quot;:&quot;INFO&quot;,&quot;level_value&quot;:20000,&quot;myPOJO&quot;:{&quot;id&quot;:&quot;398bb1df-29b3-4738-afc6-a7b9e892ce19&quot;}} {&quot;@timestamp&quot;:&quot;2021-12-08T15:07:11.381Z&quot;,&quot;@version&quot;:1,&quot;message&quot;:&quot;Message with placeholder&quot;,&quot;logger_name&quot;:&quot;LoggerTest&quot;,&quot;thread_name&quot;:&quot;Test worker&quot;,&quot;level&quot;:&quot;INFO&quot;,&quot;level_value&quot;:20000,&quot;myPOJO&quot;:{&quot;id&quot;:&quot;00ba3228-f434-4ed0-b45e-62ec9163aac4&quot;}} </code></pre> <p>Note that the custom POJO is always displayed as expected. The example also demonstrates that the POJO <strong>must not</strong> have a corresponding placeholder in the message as it will be automatically resolved by the provider. Exceptions <strong>must</strong> be the in the last position of the input:</p> <pre><code class="language-java">... } catch (Exception ex) { log.error(ex.getMessage(), new MyPOJO(), ex); } </code></pre> <p>If the logging handler is part of the <em>PDP Core Common Libraries</em> project, it <strong>must</strong> be registered in the static section of the <code>com.pureinsights.pdp.core.logging.CoreArgumentsProvider</code> class.</p> <p>Any other logging handler under an application context, <strong>must</strong> be declared as a bean (i.e. <code>@Singleton</code>).</p> <p>If none of the above is applicable, the handler <strong>must</strong> be registered using the <code>CoreArgumentsProvider#registerHandler(PojoLoggingHandler)</code> method.</p> <h1>Metrics</h1> <p>The metrics of the application are collected and exposed by <a href="https://micrometer.io/">Micrometer</a>. They are enabled by default in the <code>application.yml</code> and exposed through the <code>GET /metrics</code> and <code>GET /metrics/[name]</code> endpoints:</p> <pre><code class="language-yaml">micronaut: metrics: enabled: true </code></pre> <pre><code class="language-json">{ &quot;names&quot;: [ &quot;http.server.requests&quot;, &quot;jvm.buffer.count&quot;, &quot;jvm.buffer.memory.used&quot;, ..., &quot;pdp.metric.count&quot; ] } </code></pre> <blockquote> <p>When available, the metrics can be filtered with the use of tags: <code>GET /metrics/pdp.metric.count?tag=source:component&amp;tag=type:value1</code></p> </blockquote> <blockquote> <p>By default, all metrics include the <code>component</code> tag with the name of the application</p> </blockquote> <h2>Annotation-based metrics</h2> <p>Any method in any bean can be measured with:</p> <ul> <li><code>@Timed</code></li> <li><code>@Counted</code></li> </ul> <pre><code class="language-java">@Singleton public class MyService { @Timed(value = &quot;pdp.myservice.execute&quot;, histogram = true) public void execute(UUID id) { ... } @Counted(&quot;pdp.myservice.validate&quot;) public void validate(UUID id) { ... } } </code></pre> <blockquote> <p>The name of the metric <strong>must</strong> be in lowercase, using dot notation</p> </blockquote> <h2>Custom metrics</h2> <p>Instead of using annotations, the metric registry can be injected and manually handled in any bean, with any type of <a href="https://micrometer.io/docs/concepts">meter</a>:</p> <pre><code class="language-java">@Singleton public class MyCustomService { private Counter counter; @PostConstruct void init(MeterRegistry meterRegistry) { counter = Counter.builder(&quot;pdp.mycustomservice.count&quot;) .tag(&quot;key&quot;, &quot;value&quot;) .register(meterRegistry); } public void count() { counter.increment(); } } </code></pre> <h1>Retries</h1> <p><code>RetryUtils</code>, as its name may suggest, provide utils or methods to handle the retry of a <code>Runnable</code> or <code>Supplier</code>. It also handles the exceptions might occur during the run.</p> <p>By default, the <code>RetryUtils</code> will use a <code>ConstantDelayRetry</code>, wich will retry 3 times with a delay of 5 seconds.</p> <pre><code class="language-java">... AtomicInteger atomicInteger = new AtomicInteger(); RetryUtils.retry(atomicInteger::getAndIncrement) </code></pre> <p>You can also specify a custom retry policy passing it through parameters.</p> <pre><code class="language-java">... AtomicInteger atomicInteger = new AtomicInteger(); RetryUtils.retry(atomicInteger::getAndIncrement, new ConstantDelayRetry(2, 100)) </code></pre> <p>In case of any failure during the retry an exception will be thrown and the <code>RetryUtils</code> will handle it. When the number of retries reaches the specified in the retry policy a <code>CoreException</code> will be thrown.</p> <h1>Scheduler</h1> <p>Scheduled tasks are handled by <a href="http://www.quartz-scheduler.org/">Quartz</a>. Any application can inject the <code>org.quartz.Scheduler</code> bean to register/unregister its own cron jobs.</p> <p>Any execution of a <code>org.quartz.Job</code> implementation will automatically perform the dependency injection of its visible fields without the need of declaring the class as a singleton.</p> <h1>Message Queue Provider</h1> <p>To handle communication between the different components, PDP uses a message queue. The following is a quick overview of the available configuration fields, examples on how to use the Message Queue classes and the current implementations available with configuration examples.</p> <h2>Configuration</h2> <h3>Basic Configuration</h3> <p>| Property | Type | Default | Description | |---------------------------|:------:|:-------:|----------------------------------------------------------------------------------------------| | <code>messageQueue.provider</code> | String | | The name of the MessageQueue provider to use. (e.g. rabbitmq to load the RabbitMQ provider). |</p> <h2>Using the MessageQueueProvider</h2> <p>The MessageQueueProvider instance can be injected as follows:</p> <pre><code class="language-java">public class MyClass { @Inject MessageQueueProvider mqProvider; ... </code></pre> <h2>Messages</h2> <h3>Message Properties</h3> <p>| Property | Type | Default | Description | |---------------|:-------------------:|:-------:|-------------------------------------------------------------------------------------------| | <code>body</code> | byte[] | | The body of the message. | | <code>queue</code> | String | | The queue this message was sent to or delivered from. | | <code>type</code> | Message.MessageType | | The type of the message. Can be DIRECT or BROADCAST. See <a href="#message-types">Message Types</a>. | | <code>properties</code> | Map&lt;String,String&gt; | | Map of message properties. |</p> <h3>Message Types</h3> <h4>Direct Messages</h4> <p>Direct messages are sent to a given queue and are meant to be consumed by a single consumer. The message is persisted in the queue until a consumer is available.</p> <h4>Broadcast Messages</h4> <p>Broadcast messages are sent to a given queue and are meant to be consumed by all consumers available at the moment of sending. The message is not persisted if no consumers are available.</p> <h3>Creating a new Message</h3> <p>The Message class provides a builder to simplify the message creation process:</p> <pre><code class="language-java">Message newMessage = Message.builder() .body(&quot;My message&quot;) .type(Message.MessageType.DIRECT) .queue(&quot;queue&quot;) .property(&quot;key&quot;, &quot;value&quot;) .build(); </code></pre> <p>Some considerations with the Message builder:</p> <ul> <li>There are multiple ways to set the message body. If more than one is set, only the last one will be used.</li> <li>Every time the <code>property</code> method is called, the given property is appended to the map.</li> <li>Calling <code>properties</code> method will overwrite any previously set properties.</li> </ul> <h2>Consumers</h2> <h3>Configuration</h3> <p>| Property | Type | Default | Description | |---------------------------------------|:-------:|:-------:|-------------------------------------------------------------------------------| | <code>consumer.{name}.maxMessages</code> | Integer | 5 | Maximum number of messages to consume in parallel. | | <code>consumer.{name}.maxMessageRequeues</code> | Integer | 5 | Maximum number of times a message should be sent back to the queue for retry. |</p> <h4>Example</h4> <pre><code class="language-yaml">messageQueue: consumer: directConsumer: maxMessages: 4 maxMessageRequeues: 6 </code></pre> <p>This configuration can be loaded as follows</p> <pre><code class="language-java">protected void myMethod(@Named(&quot;direct-consumer&quot;) ConsumerProperties consumerProperties) { try { var maxRequeues = consumerProperties.getMaxMessageRequeues(); ... } } </code></pre> <h3>Registering Consumers</h3> <p>To register a consumer do the following:</p> <pre><code class="language-java">public class MyClass { @Inject MessageQueueProvider mqProvider; ... protected void myMethod(@Named(&quot;direct-consumer&quot;) ConsumerProperties consumerProperties) { try { mqProvider.registerConsumer(queue, consumerProperties, Message.MessageType.DIRECT, this::consumeMessage); ... } } </code></pre> <p>The <code>registerConsumer</code> method takes the following parameters:</p> <p>| Parameter | Type | Description | |--------------------|:--------------------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| | queue | String | Name of the queue to register the Consumer to. | | consumerProperties | ConsumerProperties | A ConsumerProperties instance for the consumer. | | messageType | Message.MessageType | The type of message this consumer is supposed to listen to. Can be Message.MessageType.DIRECT or Message.MessageType.BROADCAST. See <a href="#message-types">Message Types</a>. | | onMessageDelivery | Predicate&lt;Message&gt; | The function to execute when a message is received. Returns true if the message is successfully consumed, false otherwise. |</p> <h2>Producers</h2> <h3>Configuration</h3> <p>| Property | Type | Default | Description | |-------------------------------|:--------:|:-------:|--------------------------------------------------------------------------------------------------| | <code>producer.threads</code> | Integer | 5 | Number of threads available for producers to send asyncMessages. | | <code>producer.sendMessageTimeout</code> | Duration | 30s | Maximum time to wait to get a confirmation that a message has been successfully sent. | | <code>producer.retry.retries</code> | Integer | 5 | Maximum number of times to try to send a message. | | <code>producer.retry.delay</code> | Duration | 1s | Time to wait before retrying. The time is multiplied by the number of retries to backoff between executions. |</p> <h4>Example</h4> <pre><code class="language-yaml">messageQueue: producer: sendMessageTimeout: 10000 threads: 10 retry: retries: 10 delay: 10000 </code></pre> <p>This configuration can be loaded by injecting the ProducerProperties instance:</p> <pre><code class="language-java">public class MyClass { @Inject ProducerProperties producerProperties; ... </code></pre> <h3>Getting a Producer instance</h3> <p>To get a new producer instance do the following:</p> <pre><code class="language-java">public class MyClass { @Inject MessageQueueProvider mqProvider; ... protected void myMethod() { try { var myProducer = mqProvider.getProducer(); ... } } </code></pre> <h3>Sending a message</h3> <h4>Async Messages</h4> <pre><code class="language-java">producer.asyncSend(newMessage, (message) -&gt; messageSuccessHandler(message), (message, exception) -&gt; messageFailureHandler(message, exception)); </code></pre> <p>The <code>asyncSend</code> method takes the following parameters:</p> <p>| Parameter | Type | Description | |----------------|:------------------------------:|------------------------------------------------------------------------------------------------------------------| | message | Message | The <a href="#messages">Message</a> to send. | | successHandler | Consumer<Message> | A consumer function to execute when the message is succesfully sent. |</p>
  • No labels