/
Staging Connector

Staging Connector

Connects to the Staging API to perform either full or incremental crawls based on a change token. Please refer to the documentation of the Staging API for more details.

Pre-Requisites

The bucket that will be looked up must exist on the Staging API.

Configuration

Can be configured as a seed and also as a processor for the lookup action.

Example configuration as the lookup processor:

{
  "type": "staging-connector",
  "action": "lookup",
  "name": "Extract additional data",  
  "sourceId": "/a/field/pointer",
  "nestedSourceField": "/another/field/pointer",
  "targetBucket": "a_bucket_name",
  "connection": {
    "servers": [
      {
        "host": "localhost",
        "port": 8081
      }
    ],
    "connectTimeout": "PT5m",
    "readTimeout": "PT5m"
  }
}

Lookup action

This is configured as a processor and has the following fields:

sourcePath

(Required, String) A JSON pointer on the source document. The source field must be a String, but can be nested within an array.

For example:

{
  "id": "xyz",
  "nested": [
    {
      "id": "1"
    },
    {
      "id": "2"
    }
  ]
}

The sourcePath could be "/id", so it searches for "xyz" in the target bucket. Or it could be "/nested" with a combination of "nestedSourceField" set to "/id", so it would search for "1" and "2" on the target bucket.

If sourcePath is a string, then the target contents are merged with the current document. If sourcePath refers to an array, the array itself is replaced and merged with the contents of the target data.

nestedSourceField

(Required, String) A JSON pointer within the nested object when sourceField is an array.

targetBucket

(Required, String) The name of the target bucket to read data from.

Query Scan

This is configured at the seed and allows queries, filters and aggregations on the target repository. For this to be used you MUST set the "scanAction" on the seed to "query" and also provide a query in the /seed/scan configuration.

Example seed that pulls documents with no "Content-Type" header:

  {
  	"processAction": "process",
  	"seed": {
  		"servers": [{
  			"host": "localhost",
  			"port": 8081
  		}],
  		"connectTimeout": "PT5m",
  		"readTimeout": "PT5m",
  		"scan": {
  			"bucket": "source_bucket_name",
  			"scroll": {
  				"size": 100
  			},
  			"query": {
  				"filter": {
  					"or": [{
  							"exists": {
  								"fieldName": "content.metadata.Content-Type",
  								"present": false
  							}
  						},
  						{
  							"equals": {
  								"fieldName": "content.metadata.Content-Type",
  								"value": ""
  							}
  						}
  					]
  				}
  			}
  		}
  	},
  	"name": "Records missing content type",
  	"active": true,
  	"scanAction": "query",
  	"type": "staging-connector",
  	"batchSize": 25,
  	"properties": {
  		"index": "target_data_source_index_name"
  	},
  	"pipelineId": "{{ fromName('Persist to Target Data Source') }}",
  	"id": "db122b46-9104-48e7-8847-c1f2d720d596"
  }

/seed/scan/query

(Required, JSON) A JSON object with the query specification. This must adhere to the Query Language convention defined in the staging API.

/seed/scan/scroll

(Optional, JSON) A JSON object with the scroll properties.

Examples

Simple example to extract all content from bucket:

{
  "seed": {
    "servers": [
      {
        "host": "localhost",
        "port": 8081
      }
    ],
    "scan": {
      "bucket": "staging_source"
    }
  },
  "type": "staging-connector",
  "name": "Staging connector",
  "active": true,
  "processAction": "process",
  "scanAction": "scan",
  "batchSize": 25,
  "pipelineId": "1dc3b770-db77-4009-a20d-40d1aa5c3c85"
}

Filter

Use filter in query to extract a record with specific payload value

Filter object:

"query": {
    "filter": {
        "equals": {
            "fieldName": "content.payload",
            "value": "2vnBU7jguQYM0lfTDf1BHb0aRJYF8sKcriFFx29rqn4sbnrCqF50mYBX6h7C2WwrOx1b7bHJEwPzEPMxUNWIlqA6waWasuFjhnPyHxT6EihhQFyyebiE8dfj0CXD8vtH"
        }
    }
}

Filter object in seed configuration:

{
      "type": "staging-connector",
      "id": "5e50d861-ec81-48f6-bd4a-2dd498a43c61",
      "creationTimestamp": 1685999446303,
      "lastUpdatedTimestamp": 1685999446303,
      "name": "Staging connector Test",
      "description": null,
      "labels": {},
      "active": true,
      "config": {
        "seed": {
          "servers": [
            {
              "host": "localhost",
              "port": 8081
            }
          ],
          "scan": {
            "bucket": "target-bucket",
            "scroll": {
              "size": 10
            },
            "query": {
              "filter": {
                "equals": {
                  "fieldName": "content.payload",
                  "value": "2vnBU7jguQYM0lfTDf1BHb0aRJYF8sKcriFFx29rqn4sbnrCqF50mYBX6h7C2WwrOx1b7bHJEwPzEPMxUNWIlqA6waWasuFjhnPyHxT6EihhQFyyebiE8dfj0CXD8vtH"
                }
              }
            }
          }
        }
      },
      "pipelineId": "019d2492-6712-4444-8703-7fcc70998b34",
      "credentialId": null,
      "scanAction": "query",
      "processAction": "process",
      "batchSize": 25,
      "properties": null,
      "idPrefix": null,
      "hashRecordIds": false,
      "jobRetries": 2,
      "erroredRecordThreshold": 0.95,
      "jobTimeout": "PT24H"
    }

Projection

Use projection to exclude a field

Projection object:

"projection": {
    "fields": ["content.counter"],
    "type": "EXCLUDE"
}

Query with projection object:

{
      "type": "staging-connector",
      "id": "5e50d861-ec81-48f6-bd4a-2dd498a43c61",
      "creationTimestamp": 1685999446303,
      "lastUpdatedTimestamp": 1686001568951,
      "name": "Staging connector Test",
      "description": null,
      "labels": {},
      "active": true,
      "config": {
        "seed": {
          "servers": [
            {
              "host": "localhost",
              "port": 8081
            }
          ],
          "scan": {
            "bucket": "target-bucket",
            "scroll": {
              "size": 10
            },
            "query": {
              "projection": {
                "fields": ["content.counter"],
                "type": "EXCLUDE"
              },
              "filter": {
                "equals": {
                  "fieldName": "content.payload",
                  "value": "2vnBU7jguQYM0lfTDf1BHb0aRJYF8sKcriFFx29rqn4sbnrCqF50mYBX6h7C2WwrOx1b7bHJEwPzEPMxUNWIlqA6waWasuFjhnPyHxT6EihhQFyyebiE8dfj0CXD8vtH"
                }
              }
            }
          }
        }
      },
      "pipelineId": "019d2492-6712-4444-8703-7fcc70998b34",
      "credentialId": null,
      "scanAction": "query",
      "processAction": "process",
      "batchSize": 25,
      "properties": null,
      "idPrefix": null,
      "hashRecordIds": false,
      "jobRetries": 2,
      "erroredRecordThreshold": 0.95,
      "jobTimeout": "PT24H"
    }

Aggregate

Use aggregation to group by date

{
    "group": {
        "fieldName": "content.generatedAt"
    }
}

Query with group object

{
  "type": "staging-connector",
  "id": "5e50d861-ec81-48f6-bd4a-2dd498a43c61",
  "creationTimestamp": 1685999446303,
  "lastUpdatedTimestamp": 1686066679572,
  "name": "Staging connector Test",
  "description": null,
  "labels": {},
  "active": true,
  "config": {
    "seed": {
      "servers": [
        {
          "host": "localhost",
          "port": 8081
        }
      ],
      "scan": {
        "bucket": "target-bucket",
        "scroll": {
          "size": 10
        },
        "query": {
          "filter": {
            "exists": {
              "fieldName": "content.generatedAt",
              "present": true
            }
          },
          "aggregate": [
            {
              "group": {
                "fieldName": "content.generatedAt"
              }
            }
          ]
        }
      }
    }
  },
  "pipelineId": "019d2492-6712-4444-8703-7fcc70998b34",
  "credentialId": null,
  "scanAction": "query",
  "processAction": "process",
  "batchSize": 25,
  "properties": null,
  "idPrefix": null,
  "hashRecordIds": false,
  "jobRetries": 2,
  "erroredRecordThreshold": 0.95,
  "jobTimeout": "PT24H"
}

Known limitations

Lookup action

Changes in the lookup up bucket don't trigger an update of the original document.

©2024 Pureinsights Technology Corporation