JobSink, triggering long-running background jobs when events occurs¶
Usually event processing combined with a Knative Service is expected to complete in a relative short period of time (minutes) as it requires the HTTP connection to stay open as otherwise the service is scaled down.
Keeping long-running connections open increases the possibility of failing and so the processing needs to restart as the request is retried.
This limitation is not ideal, JobSink is a resource you can use to create long-running
asynchronous jobs and tasks.
JobSink supports the full
Kubernetes batch/v1 Job resource and features
and Kubernetes Job queuing systems like Kueue.
Prerequisites¶
You must have access to a Kubernetes cluster with Knative Eventing installed.
Usage¶
When an event is sent to a JobSink, Eventing creates a Job and mounts the received event as
JSON file at /etc/jobsink-event/event.
- Create a JobSinkapiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-logger spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "cat" ] args: - "/etc/jobsink-event/event"
- Apply the JobSinkresource:kubectl apply -f <job-sink-file.yaml>
- Verify JobSinkis ready:Example output:kubectl get jobsinks.sinks.knative.devNAME URL AGE READY REASON job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s True
- Trigger a JobSinkkubectl run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: my/curl/command" \ -H "ce-type: my.demo.event" \ -H "ce-id: 123" \ -d '{"details":"JobSinkDemo"}' \ http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
- Verify a Jobis created and prints the event:Example output:kubectl logs job-sink-loggerszoi6-dqbtq{"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
JobSink idempotency¶
JobSink will create a job for each different received event.
An event is uniquely identified by the combination of event source and id attributes.
If an event with the same source and id attributes is received and a job is already present,
another Job will not be created.
Reading the event file¶
You can read the file and deserialize it using any CloudEvents JSON deserializer.
For example, the following snippet reads an event using the CloudEvents Go SDK and processes it.
package mytask
import (
    "encoding/json"
    "fmt"
    "os"
    cloudevents "github.com/cloudevents/sdk-go/v2"
)
func handleEvent() error {
    eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
    if err != nil {
        return err
    }
    event := &cloudevents.Event{}
    if err := json.Unmarshal(eventBytes, event); err != nil {
        return err
    }
    // Process event ...
    fmt.Println(event)
    return nil
}
Trigger a Job from different event sources¶
A JobSink can be triggered by any event source or trigger.
For example, you can trigger a Job when a Kafka record is sent to a Kafka topic using
a KafkaSource:
apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - knative-demo-topic
  sink:
    ref:
      apiVersion: sinks.knative.dev/v1alpha1
      kind: JobSink
      name: job-sink-logger
or when Knative Broker receives an event using a Trigger:
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-job-sink-trigger
spec:
  broker: my-broker
  filter:
    attributes:
      type: dev.knative.foo.bar
      myextension: my-extension-value
    subscriber:
      ref:
        apiVersion: sinks.knative.dev/v1alpha1
        kind: JobSink
        name: job-sink-logger
or even as dead letter sink for a Knative Broker
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: my-broker
spec:
  # ...
  delivery:
    deadLetterSink:
      ref:
        apiVersion: sinks.knative.dev/v1alpha1
        kind: JobSink
        name: job-sink-logger
    retry: 5
    backoffPolicy: exponential
    backoffDelay: "PT1S"
Customizing the event file directory¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
  name: job-sink-custom-mount-path
spec:
  job:
    spec:
      completions: 1
      parallelism: 1
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: main
              image: docker.io/library/bash:5
              command: [ "bash" ]
              args:
                - -c
                - echo "Hello world!" && sleep 5
              # The event will be available in a file at `/etc/custom-path/event`
              volumeMounts:
                - name: "jobsink-event"
                  mountPath: "/etc/custom-path"
                  readOnly: true
Cleaning up finished jobs¶
To clean up finished jobs, you can set
the spec.job.spec.ttlSecondsAfterFinished: 600 field
and Kubernetes will remove finished jobs after 600 seconds (10 minutes).
JobSink examples¶
JobSink success example¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
  name: job-sink-success
spec:
  job:
    metadata:
      labels:
        my-label: my-value
    spec:
      completions: 12
      parallelism: 3
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: main
              image: docker.io/library/bash:5
              command: [ "bash" ]
              args:
                - -c
                - echo "Hello world!" && sleep 5
      backoffLimit: 6
      podFailurePolicy:
        rules:
          - action: FailJob
            onExitCodes:
              containerName: main      # optional
              operator: In             # one of: In, NotIn
              values: [ 42 ]
          - action: Ignore             # one of: Ignore, FailJob, Count
            onPodConditions:
              - type: DisruptionTarget   # indicates Pod disruption
JobSink failure example¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
  name: job-sink-failure
spec:
  job:
    metadata:
      labels:
        my-label: my-value
    spec:
      completions: 12
      parallelism: 3
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: main
              image: docker.io/library/bash:5
              command: [ "bash" ]        # example command simulating a bug which triggers the FailJob action
              args:
                - -c
                - echo "Hello world!" && sleep 5 && exit 42
      backoffLimit: 6
      podFailurePolicy:
        rules:
          - action: FailJob
            onExitCodes:
              containerName: main      # optional
              operator: In             # one of: In, NotIn
              values: [ 42 ]
          - action: Ignore             # one of: Ignore, FailJob, Count
            onPodConditions:
              - type: DisruptionTarget   # indicates Pod disruption