Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

log.info("Message", new MyPOJO());
        log.info("Message {}", new MyPOJO());
        log.info("Message {}", "with placeholder", new MyPOJO());
{"@timestamp":"2021-12-08T15:07:11.379Z","@version":1,"message":"Message","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"707ea126-d94b-47b6-9010-72a3acc8a786"}}
{"@timestamp":"2021-12-08T15:07:11.380Z","@version":1,"message":"Message MyPOJO$373141eb","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"398bb1df-29b3-4738-afc6-a7b9e892ce19"}}
{"@timestamp":"2021-12-08T15:07:11.381Z","@version":1,"message":"Message with placeholder","logger_name":"LoggerTest","thread_name":"Test worker","level":"INFO","level_value":20000,"myPOJO":{"id":"00ba3228-f434-4ed0-b45e-62ec9163aac4"}}

Note that the custom POJO is always displayed as expected. The example also demonstrates that the POJO must not have a corresponding placeholder in the message as it will be automatically resolved by the provider. Exceptions must be the in the last position of the input:

...
        } catch (Exception ex) {
        log.error(ex.getMessage(), new MyPOJO(), ex);
        }

If the logging handler is part of the PDP Core Common Libraries project, it must be registered in the static section of the com.pureinsights.pdp.core.logging.CoreArgumentsProvider class.

Any other logging handler under an application context, must be declared as a bean (i.e. @Singleton).

If none of the above is applicable, the handler must be registered using the CoreArgumentsProvider#registerHandler(PojoLoggingHandler) method.

Metrics

The metrics of the application are collected and exposed by Micrometer. They are enabled by default in the application.yml and exposed through the GET /metrics and GET /metrics/[name] endpoints:

micronaut:
  metrics:
    enabled: true
{
  "names": [
    "http.server.requests",
    "jvm.buffer.count",
    "jvm.buffer.memory.used",
    ...,
    "pdp.metric.count"
  ]
}

When available, the metrics can be filtered with the use of tags: GET /metrics/pdp.metric.count?tag=source:component&tag=type:value1

By default, all metrics include the component tag with the name of the application

Annotation-based metrics

Any method in any bean can be measured with:

  • @Timed
  • @Counted
@Singleton
public class MyService {

  @Timed(value = "pdp.myservice.execute", histogram = true)
  public void execute(UUID id) {
    ...
  }


  @Counted("pdp.myservice.validate")
  public void validate(UUID id) {
    ...
  }
}

The name of the metric must be in lowercase, using dot notation

Custom metrics

Instead of using annotations, the metric registry can be injected and manually handled in any bean, with any type of meter:

@Singleton
public class MyCustomService {
  private Counter counter;


  @PostConstruct
  void init(MeterRegistry meterRegistry) {
    counter = Counter.builder("pdp.mycustomservice.count")
            .tag("key", "value")
            .register(meterRegistry);
  }


  public void count() {
    counter.increment();
  }
}

Retries

RetryUtils, as its name may suggest, provide utils or methods to handle the retry of a Runnable or Supplier. It also handles the exceptions might occur during the run.

By default, the RetryUtils will use a ConstantDelayRetry, wich will retry 3 times with a delay of 5 seconds.

...
        AtomicInteger atomicInteger = new AtomicInteger();
        RetryUtils.retry(atomicInteger::getAndIncrement)

You can also specify a custom retry policy passing it through parameters.

...
        AtomicInteger atomicInteger = new AtomicInteger();
        RetryUtils.retry(atomicInteger::getAndIncrement, new ConstantDelayRetry(2, 100))

In case of any failure during the retry an exception will be thrown and the RetryUtils will handle it. When the number of retries reaches the specified in the retry policy a CoreException will be thrown.

Scheduler

Scheduled tasks are handled by Quartz. Any application can inject the org.quartz.Scheduler bean to register/unregister its own cron jobs.

Any execution of a org.quartz.Job implementation will automatically perform the dependency injection of its visible fields without the need of declaring the class as a singleton.

Message Queue Provider

To handle communication between the different components, PDP uses a message queue. The following is a quick overview of the available configuration fields, examples on how to use the Message Queue classes and the current implementations available with configuration examples.

Configuration

Basic Configuration

| Property | Type | Default | Description | |---------------------------|:------:|:-------:|----------------------------------------------------------------------------------------------| | messageQueue.provider | String | | The name of the MessageQueue provider to use. (e.g. rabbitmq to load the RabbitMQ provider). |

Using the MessageQueueProvider

The MessageQueueProvider instance can be injected as follows:

public class MyClass {

  @Inject
  MessageQueueProvider mqProvider;
  ...

Messages

Message Properties

| Property | Type | Default | Description | |---------------|:-------------------:|:-------:|-------------------------------------------------------------------------------------------| | body | byte[] | | The body of the message. | | queue | String | | The queue this message was sent to or delivered from. | | type | Message.MessageType | | The type of the message. Can be DIRECT or BROADCAST. See Message Types. | | properties | Map<String,String> | | Map of message properties. |

Message Types

Direct Messages

Direct messages are sent to a given queue and are meant to be consumed by a single consumer. The message is persisted in the queue until a consumer is available.

Broadcast Messages

Broadcast messages are sent to a given queue and are meant to be consumed by all consumers available at the moment of sending. The message is not persisted if no consumers are available.

Creating a new Message

The Message class provides a builder to simplify the message creation process:

Message newMessage = Message.builder()
        .body("My message")
        .type(Message.MessageType.DIRECT)
        .queue("queue")
        .property("key", "value")
        .build();

Some considerations with the Message builder:

  • There are multiple ways to set the message body. If more than one is set, only the last one will be used.
  • Every time the property method is called, the given property is appended to the map.
  • Calling properties method will overwrite any previously set properties.

Consumers

Configuration

| Property | Type | Default | Description | |---------------------------------------|:-------:|:-------:|-------------------------------------------------------------------------------| | consumer.{name}.maxMessages | Integer | 5 | Maximum number of messages to consume in parallel. | | consumer.{name}.maxMessageRequeues | Integer | 5 | Maximum number of times a message should be sent back to the queue for retry. |

Example

messageQueue:
  consumer:
    directConsumer:
      maxMessages: 4
      maxMessageRequeues: 6

This configuration can be loaded as follows

protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
        try {
        var maxRequeues = consumerProperties.getMaxMessageRequeues();
        ...
        }
        }

Registering Consumers

To register a consumer do the following:

public class MyClass {

  @Inject
  MessageQueueProvider mqProvider;
  ...

  protected void myMethod(@Named("direct-consumer") ConsumerProperties consumerProperties) {
    try {
      mqProvider.registerConsumer(queue, consumerProperties, Message.MessageType.DIRECT, this::consumeMessage);    
    ...
    }
  }

The registerConsumer method takes the following parameters:

| Parameter | Type | Description | |--------------------|:--------------------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| | queue | String | Name of the queue to register the Consumer to. | | consumerProperties | ConsumerProperties | A ConsumerProperties instance for the consumer. | | messageType | Message.MessageType | The type of message this consumer is supposed to listen to. Can be Message.MessageType.DIRECT or Message.MessageType.BROADCAST. See Message Types. | | onMessageDelivery | Predicate<Message> | The function to execute when a message is received. Returns true if the message is successfully consumed, false otherwise. |

Producers