Forward workflow execution logs to one or more desired destinations.
The Log Shipper task extracts logs from the Kestra backend and loads them to desired destinations including Datadog, Elasticsearch, New Relic, OpenTelemetry, AWS CloudWatch, Google Operational Suite, and Azure Monitor.
The task works incrementally in batches:
- Determines the starting timestamp using either:
- The last successfully processed log's timestamp (persisted in KV Store using the offsetKey)
- Current time minus lookbackPeriodduration if no previous state exists
 
- The last successfully processed log's timestamp (persisted in KV Store using the 
- Sends retrieved logs through configured logExporters
- Stores the timestamp of the last processed log to maintain state between executions
- Subsequent runs continue from the last stored timestamp
This incremental approach ensures reliable log forwarding without gaps or duplicates.
type: "io.kestra.plugin.ee.core.log.LogShipper"Examples
Ship logs to multiple destinations
id: logShipper
namespace: system
tasks:
  - id: shipLogs
    type: io.kestra.plugin.ee.core.log.LogShipper
    logLevelFilter: INFO
    lookbackPeriod: P1D
    offsetKey: logShipperOffset
    delete: false
    logExporters:
      - id: file
        type: io.kestra.plugin.ee.core.log.FileLogExporter
      - id: awsCloudWatch
        type: io.kestra.plugin.ee.aws.cloudwatch.LogExporter
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
        region: us-east-1
        logGroupName: kestra
        logStreamName: production
      - id: S3LogExporter
        type: io.kestra.plugin.ee.aws.s3.LogExporter
        accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
        secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
        region: "{{ vars.region }}"
        format: JSON
        bucket: logbucket
        logFilePrefix: kestra-log-file
        maxLinesPerFile: 1000000
      - id: googleOperationalSuite
        type: io.kestra.plugin.ee.gcp.operationalsuite.LogExporter
        projectId: my-gcp-project
      - id: gcs
        type: io.kestra.plugin.ee.gcp.gcs.LogExporter
        projectId: myProjectId
        format: JSON
        maxLinesPerFile: 10000
        bucket: my-bucket
        logFilePrefix: kestra-log-file
        chunk: 1000
      - id: azureMonitor
        type: io.kestra.plugin.ee.azure.monitor.LogExporter
        endpoint: https://endpoint-host.ingest.monitor.azure.com
        tenantId: "{{ secret('AZURE_TENANT_ID') }}"
        clientId: "{{ secret('AZURE_CLIENT_ID') }}"
        clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
        ruleId: dcr-69f0b123041d4d6e9f2bf72aad0b62cf
        streamName: kestraLogs
      - id: azureBlobStorage
        type: io.kestra.plugin.ee.azure.storage.LogExporter
        endpoint: https://myblob.blob.core.windows.net/
        tenantId: "{{ secret('AZURE_TENANT_ID') }}"
        clientId: "{{ secret('AZURE_CLIENT_ID') }}"
        clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
        containerName: logs
        format: JSON
        logFilePrefix: kestra-log-file
        maxLinesPerFile: 1000000
        chunk: 1000
      - id: datadog
        type: io.kestra.plugin.ee.datadog.LogExporter
        basePath: https://http-intake.logs.datadoghq.eu
        apiKey: "{{ secret('DATADOG_API_KEY') }}"
      - id: elasticsearch
        type: io.kestra.plugin.ee.elasticsearch.LogExporter
        indexName: kestra-logs
        connection:
          basicAuth:
            password: "{{ secret('ES_PASSWORD') }}"
            username: kestra_user
          hosts:
            - https://elastic.example.com:9200
      - id: opensearch
        type: io.kestra.plugin.ee.opensearch.LogExporter
        indexName: kestra-logs
        connection:
          basicAuth:
            password: "{{ secret('ES_PASSWORD') }}"
            username: kestra_user
          hosts:
            - https://elastic.example.com:9200
      - id: newRelic
        type: io.kestra.plugin.ee.newrelic.LogExporter
        basePath: https://log-api.newrelic.com
        apiKey: "{{ secret('NEWRELIC_API_KEY') }}"
      - id: openTelemetry
        type: io.kestra.plugin.ee.opentelemetry.LogExporter
        otlpEndpoint: http://otel-collector:4318/v1/logs
        authorizationHeaderName: Authorization
        authorizationHeaderValue: "Bearer {{ secret('OTEL_TOKEN') }}"
triggers:
  - id: dailySchedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 0 * * *"
    disabled: true
Properties
logExporters *RequiredNon-dynamicFileLogExporterLogExporterLogExporterLogExporterLogExporterLogExporterLogExporterLogExporterLogExporterLogExporterLogExporterLogExporterLogExporter
1List of log shippers
The list of log shippers to use for sending logs
delete booleanstring
Delete logs after export
The log shipper will delete the exported logs
executionId string
Execution to search
The execution ID to use to filter logs
flowId string
Flow to search
The flow ID to use to filter logs
logLevelFilter string
INFOLog level to send
This property specifies the minimum log level to send.
lookbackPeriod string
P1DdurationStarting duration before now
If no previous execution or state exists, the fetch start date is set to the current time minus this duration
namespace string
Namespace to search
The namespace to use to filter logs
offsetKey string
Prefix of the KVStore key
The prefix of the KVStore key that contains the last execution's end fetched date
Outputs
Definitions
io.kestra.core.http.client.configurations.TimeoutConfiguration
connectTimeout string
durationThe time allowed to establish a connection to the server before failing.
readIdleTimeout string
PT5MdurationThe time allowed for a read connection to remain idle before closing it.
Ship logs to Elasticsearch
connection *RequiredElasticsearchConnection
The connection properties.
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1indexName *Requiredstring
The name of the index to send logs to
type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
io.kestra.core.http.client.configurations.ProxyConfiguration
address string
The address of the proxy server.
password string
The password for proxy authentication.
port integerstring
The port of the proxy server.
type string
DIRECTDIRECTHTTPSOCKSThe type of proxy to use.
username string
The username for proxy authentication.
io.kestra.plugin.ee.opensearch.OpensearchConnection
hosts *Requiredarray
List of HTTP OpenSearch servers.
Must be an URI like https://opensearch.com: 9200 with scheme and port.
basicAuth OpensearchConnection-BasicAuth
Basic auth configuration.
headers array
List of HTTP headers to be send on every request.
Must be a string with key value separated with : , ex: Authorization: Token XYZ.
pathPrefix string
Sets the path's prefix for every request used by the HTTP client.
For example, if this is set to /my/path, then any client request will become /my/path/ + endpoint.
In essence, every request's endpoint is prefixed by this pathPrefix.
The path prefix is useful for when OpenSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.
strictDeprecationMode booleanstring
Whether the REST client should return any response containing at least one warning header as a failure.
trustAllSsl booleanstring
Trust all SSL CA certificates.
Use this if the server is using a self signed SSL certificate.
Export logs to S3
bucket *Requiredstring
S3 Bucket to upload logs files.
The bucket where log files are going to be imported
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1region *Requiredstring
AWS region with which the SDK should communicate.
type *Requiredobject
accessKeyId string
Access Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
chunk integerstring
1000The chunk size for every bulk request.
endpointOverride string
The endpoint with which the SDK should communicate.
This property allows you to use a different S3 compatible storage backend.
format string
JSONIONJSONFormat of the exported files
The format of the exported files
logFilePrefix string
kestra-log-filePrefix of the log files
The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion
maxLinesPerFile integerstring
100000Maximum number of lines per file
The maximum number of lines per file
secretKeyId string
Secret Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
sessionToken string
AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsEndpointOverride string
The AWS STS endpoint with which the SDKClient should communicate.
stsRoleArn string
AWS STS Role.
The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsRoleExternalId string
AWS STS External Id.
A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn is defined.
stsRoleSessionDuration string
PT15MdurationAWS STS Session duration.
The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn is defined.
stsRoleSessionName string
AWS STS Session name.
This property is only used when an stsRoleArn is defined.
Export logs to Azure Monitor
endpoint *Requiredstring
Url of the Data Collection Endpoint
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1ruleId *Requiredstring
Id of the Data Collection Rule
streamName *Requiredstring
Name of the stream
type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
clientId string
Client ID
Client ID of the Azure service principal. If you don't have a service principal, refer to create a service principal with Azure CLI.
clientSecret string
Client Secret
Service principal client secret. The tenantId, clientId and clientSecret of the service principal are required for this credential to acquire an access token.
pemCertificate string
PEM Certificate
Your stored PEM certificate.
The tenantId, clientId and clientCertificate of the service principal are required for this credential to acquire an access token.
tenantId string
Tenant ID
java.nio.charset.Charset
io.kestra.core.http.client.configurations.HttpConfiguration
allowFailed booleanstring
falseIf true, allow a failed response code (response code >= 400)
allowedResponseCodes array
List of response code allowed for this request
auth BasicAuthConfigurationBearerAuthConfiguration
The authentification to use.
defaultCharset Charsetstring
UTF-8The default charset for the request.
followRedirects booleanstring
trueWhether redirects should be followed automatically.
logs array
REQUEST_HEADERSREQUEST_BODYRESPONSE_HEADERSRESPONSE_BODYThe enabled log.
proxy ProxyConfiguration
The proxy configuration.
ssl SslOptions
The SSL request options
timeout TimeoutConfiguration
The timeout configuration.
io.kestra.core.models.tasks.Output
Export logs to AWS CloudWatch
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1logGroupName *Requiredstring
The name of the log group.
logStreamName *Requiredstring
The name of the log stream
region *Requiredstring
AWS region with which the SDK should communicate.
type *Requiredobject
accessKeyId string
Access Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
chunk integerstring
1000The chunk size for every bulk request.
endpointOverride string
The endpoint with which the SDK should communicate.
This property allows you to use a different S3 compatible storage backend.
secretKeyId string
Secret Key Id in order to connect to AWS.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
sessionToken string
AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.
If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsEndpointOverride string
The AWS STS endpoint with which the SDKClient should communicate.
stsRoleArn string
AWS STS Role.
The Amazon Resource Name (ARN) of the role to assume. If set the task will use the StsAssumeRoleCredentialsProvider. If no credentials are defined, we will use the default credentials provider chain to fetch credentials.
stsRoleExternalId string
AWS STS External Id.
A unique identifier that might be required when you assume a role in another account. This property is only used when an stsRoleArn is defined.
stsRoleSessionDuration string
PT15MdurationAWS STS Session duration.
The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an stsRoleArn is defined.
stsRoleSessionName string
AWS STS Session name.
This property is only used when an stsRoleArn is defined.
io.kestra.plugin.ee.elasticsearch.ElasticsearchConnection
hosts *Requiredarray
1List of HTTP ElasticSearch servers.
Must be an URI like https://elasticsearch.com: 9200 with scheme and port.
basicAuth ElasticsearchConnection-BasicAuth
Basic auth configuration.
headers array
List of HTTP headers to be send on every request.
Must be a string with key value separated with : , ex: Authorization: Token XYZ.
pathPrefix string
Sets the path's prefix for every request used by the HTTP client.
For example, if this is set to /my/path, then any client request will become /my/path/ + endpoint.
In essence, every request's endpoint is prefixed by this pathPrefix.
The path prefix is useful for when ElasticSearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; it is not intended for other purposes and it should not be supplied in other scenarios.
strictDeprecationMode booleanstring
Whether the REST client should return any response containing at least one warning header as a failure.
trustAllSsl booleanstring
Trust all SSL CA certificates.
Use this if the server is using a self signed SSL certificate.
io.kestra.core.http.client.configurations.BearerAuthConfiguration
type *Requiredobject
token string
The token for bearer token authentication.
Export logs to Splunk
host *Requiredstring
Splunk host
Url of the Splunk host to export logs to
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1token *Requiredstring
Splunk token
Token used to authenticate to Splunk API
type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
options HttpConfiguration
The http client configuration
source string
KestraLog source
The source of the logs
Export logs to a Google Cloud Storage
bucket *Requiredstring
GCS Bucket to upload logs files.
The bucket where log files are going to be imported
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
format string
JSONIONJSONFormat of the exported files
The format of the exported files
impersonatedServiceAccount string
The GCP service account to impersonate.
logFilePrefix string
kestra-log-filePrefix of the log files
The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion
maxLinesPerFile integerstring
100000Maximum number of lines per file
The maximum number of lines per file
projectId string
The GCP project ID.
scopes array
["https://www.googleapis.com/auth/cloud-platform"]The GCP scopes to be used.
serviceAccount string
The GCP service account key.
io.kestra.core.http.client.configurations.BasicAuthConfiguration
type *Requiredobject
password string
The password for HTTP basic authentication.
username string
The username for HTTP basic authentication.
Export logs to an Opentelemetry collector
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1otlpEndpoint *Requiredstring
OTLP endpoint
Url of the OTLP endpoint to export logs to
type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
Export logs to a Google Operational Suite
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
impersonatedServiceAccount string
The GCP service account to impersonate.
projectId string
The GCP project ID.
scopes array
["https://www.googleapis.com/auth/cloud-platform"]The GCP scopes to be used.
serviceAccount string
The GCP service account key.
io.kestra.core.http.client.configurations.SslOptions
insecureTrustAllCertificates booleanstring
Whether to disable checking of the remote SSL certificate.
Only applies if no trust store is configured. Note: This makes the SSL connection insecure and should only be used for testing. If you are using a self-signed certificate, set up a trust store instead.
Ship logs to a file inside Kestra's internal storage.
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1type *Requiredobject
format string
IONIONJSONFormat of the exported files
This property defines the format of the exported files.
logFilePrefix string
kestra-log-filePrefix of the log files
This property sets the prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion.
maxLinesPerFile integerstring
Maximum number of lines per file
This property specifies the maximum number of lines per log file.
Ship logs to a Datadog instance.
apiKey *Requiredstring
Api key
Api key used to log in the Datadog instance
basePath *Requiredstring
Datadog base path
Base path of the Datadog instance
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
options HttpConfiguration
The http client configuration
service string
LogExporterLog sending service
Name of the service that send logs
source string
KestraLog source
The source of the logs
io.kestra.plugin.ee.elasticsearch.ElasticsearchConnection-BasicAuth
password string
Basic auth password.
username string
Basic auth username.
io.kestra.plugin.ee.opensearch.OpensearchConnection-BasicAuth
password string
Basic auth password.
username string
Basic auth username.
Ship logs to New Relic
apiKey *Requiredstring
Authentication key
Api key or License key used to log to the New Relic instance
basePath *Requiredstring
New Relic base path
Base path of the new relic instance to send logs to
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
options HttpConfiguration
The http client configuration
Export logs to Azure Blob Storage
containerName *Requiredstring
Name of the container
Name of the container in the blob storage
endpoint *Requiredstring
Url of the Blob Storage
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
clientId string
Client ID
Client ID of the Azure service principal. If you don't have a service principal, refer to create a service principal with Azure CLI.
clientSecret string
Client Secret
Service principal client secret. The tenantId, clientId and clientSecret of the service principal are required for this credential to acquire an access token.
connectionString string
Connection string of the Storage Account.
format string
JSONIONJSONFormat of the exported files
The format of the exported files
logFilePrefix string
kestra-log-filePrefix of the log files
The prefix of the log files name. The full file name will be logFilePrefix-localDateTime.json/ion
maxLinesPerFile integerstring
100000Maximum number of lines per file
The maximum number of lines per file
pemCertificate string
PEM Certificate
Your stored PEM certificate.
The tenantId, clientId and clientCertificate of the service principal are required for this credential to acquire an access token.
sasToken string
The SAS token to use for authenticating requests.
This string should only be the query parameters (with or without a leading '?') and not a full URL.
tenantId string
Tenant ID
Ship logs to Opensearch
connection *RequiredOpensearchConnection
The connection properties.
id *Requiredstring
^[a-zA-Z0-9][a-zA-Z0-9_-]*1indexName *Requiredstring
The name of the index to send logs to
type *Requiredobject
chunk integerstring
1000The chunk size for every bulk request.
