Explore Knative Eventing

I did some experiments on Knative Eventing with some practical coding recently. This paper summarised what I have done for future referencing purposes.

The components involved and developed are shown in the following diagram.

Monitor a Minio based object storage with a ContainerSource as the event source. Once a new image file is uploaded to a bucket, a cloud event will be triggered and send to Knative Eventing. Upon the events comes, the broker will send the event to the event consumer which is interested in it by subscribing through trigger filtering.

The consumers are all based on Knative Serving Service. The first consumer is the standard event display that is used to monitor and debug events. Upon the event comes, the 2nd consumer will get the object image file from Minio, resize the image and push the resized image into another bucket. The 3rd consumer will call the Watson Visual Recognition API to classify the image and send back the tags of the image as a reply event to the broker.

The Knative Eventing is based on version 0.60 as released in Kabanero. The Kubernetes cluster is Openshift 3.11.

Let's install the Minio on Openshift with Helm 3. Download the beta release, and add the stable repo.

curl -LO https://get.helm.sh/helm-v3.0.0-beta.3-linux-amd64.tar.gz
tar zxvf helm-v3.0.0-beta.3-linux-amd64.tar.gz
sudo mv linux-amd64/helm /usr/local/bin
helm repo add stable https://kubernetes-charts.storage.googleapis.com

Create a new namespace and service account. Assign the privileged security context constraint as the container need to be run as root.

oc new-project minio
oc create sa minio
oc adm policy add-scc-to-user privileged -z minio

Prepare the following values.yaml file for the chart,

existingSecret: ""
accessKey:
secretKey:
serviceAccount:
create: false
name: minio
persistence:
enabled: true
accessMode: ReadWriteOnce
size: 50Gi
subPath: ""
ingress:
enabled: true
path: /
hosts:
- minio.apps.ocp.io.cpak

A GlusterFS based storage class was created for the cluster and set as the default storage class. Install the chart with the command,

helm install minio stable/minio -f values.yaml -n minio

In order to set the container user as root (The minio configuration file is created under /root/.minio), patch the security context for the deployment,

oc patch deployment minio --type=json -p='[{"op": "add", "path": "/spec/template/spec/securityContext/runAsUser", "value": "0"}]'

Get the access key and secret key for login.

oc get secret minio -o jsonpath="{ .data.accesskey }" | base64 -d -i; echooc get secret minio -o jsonpath="{ .data.secretkey }" | base64 -d -i; echo

When all the pods are ready, go to the ingress URL to access the Minio browser, log in with the access and secret key.

ContainerSource provides a generic way to bring in any custom cloud event into Knative eventing.

When the ContainerSource controller reconciles the configuration and runs the pod, it will pass in a sink argument to the command line. This is the target sink where the event should be sent to. We initialize the cloud event client as below,

ceClient, err = kncloudevents.NewDefaultClient(config.Sink)

In the case of Minio, we call the Minio SDK API of ListenBucketNotification to get notified for any s3:ObjectCreated:* changes. Upon the receiving of the notification, we fire the cloud event as below,

data := Data{
Bucket: e.S3.Bucket.Name,
Name: e.S3.Object.Key,
Size: e.S3.Object.Size,
}
cEvent := cloudevents.Event{
Context: cloudevents.EventContextV02{
Type: eventType,
Source: *types.ParseURLRef(eventSource),
Extensions: map[string]interface{}{},
}.AsV02(),
Data: data,
}
if _, _, err := ceClient.Send(context.Background(), cEvent); err != nil {
log.Printf("failed to send cloudevent: %s. Continue", err.Error())
}

We can define the eventType and eventSource through the command line argument. They can be then used to filter the event in the trigger later.

Create the following Dockerfile.

FROM golang as builder
WORKDIR /build
COPY ./src /build
RUN CGO_ENABLED=0 go build -o postEventSource *.goFROM alpine
WORKDIR /app
COPY --from=builder /build/postEventSource /app
ENTRYPOINT ["./postEventSource"]

Notice the last line is using ENTRYPOINT, upon running of the container, the controller will just supply the command argument, such as--sinkwithout the need to know which executable to run, which makes it generic enough for all other ContainerSource.

Build the docker image and push it into the Openshift docker registry.

Create the target namespace,

oc new-project kevent-exp

Create the default broker under the namespace with the following YAML.

apiVersion: eventing.knative.dev/v1alpha1
kind: Broker
metadata:
name: default
namespace: kevent-exp
spec:
channelTemplate:
provisioner:
apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterChannelProvisioner
name: in-memory-channel

Notice I am using the V0.60 formatting. Though it was mentioned there is an automated way by annotating the namespace, seems like it doesn’t create the broker successfully. I have to create the broker specifying the provisioner explicitly and in the meantime, label the namespace with knative-eventing-injection=enabled

Check the broker and the channels are ready,

kubectl get broker,channel
NAME READY REASON HOSTNAME AGE
broker.eventing.knative.dev/default True default-broker.kevent-exp.svc.cluster.local 2d
NAME READY REASON AGE
channel.eventing.knative.dev/default-broker-b95kt True 2d
channel.eventing.knative.dev/default-broker-lhdgg True 2d

Apply the following config,

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: ContainerSource
metadata:
name: minio-event-source
spec:
image: docker-registry.default.svc:5000/kevent-exp/kevent-exp-minio-event-source:latest
env:
- name: MINIO_SERVER
value: "minio.minio.svc:9000"
- name: MINIO_KEY
value: "9...<skipped>...M"
- name: MINIO_SECRET
value: "F...<skipped>...A"
- name: MINIO_BUCKET
value: "images"
sink:
apiVersion: eventing.knative.dev/v1alpha1
kind: Broker
name: default

The CRD containersource, deployment, and pods should be created and ready. If we check the details the pods, we will find out,

spec:
containers:
- args:
- --sink=http://default-broker.kevent-exp.svc.cluster.local/
env:
...

The controller supplies the sink URL based on the YAML definition as the arguments to the container’s ENTRYPOINT command.

Now we are ready to consume the event.

Deploy the event display image as a Knative Serving Service,

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: event-display
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/github.com/knative/eventing-sources/cmd/event_display

Create a Trigger to subscribe to all the events from the default broker,

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: trigger-for-ev-display
namespace: kvent-exp
spec:
broker: default
filter:
sourceAndType:
type: ""
source: ""
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: event-display

Here, we set the broker, which defines where we get the event. We also set what type and source of events we want to subscribe to, which are all the events in the above example. The subscriber or the consumer is the event-display service that will display in the container's log.

Upload an image file with the Minio Browser. Check the event-display pod log,

☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 0.2
type: dev.knative.eventing.zhimin.minio
source: minio
id: ca9d887c-c3fd-4270-b85e-1424a9e1e421
time: 2019-10-07T12:21:09.242099308Z
contenttype: application/json
Extensions,
knativehistory: default-broker-b95kt-channel-zjlww.kevent-exp.svc.cluster.local
Data,
{
"Bucket": "images",
"Name": "office-1209640_1920.jpg",
"Size": 582749
}

Notice the type and source are what we expected. The data is decoded successfully.

Now let’s create a service the resize the image and upload it to the resized bucket.

The logic for the event receiving is listed as below,

c, err := kncloudevents.NewDefaultClient()
if err != nil {
log.Fatal("Failed to create client:%v", err)
}
err = c.StartReceiver(context.Background(), process)
if err != nil {
log.Fatalf("Failed to start reciever:%v", err)
}

The process() function is how we handle the event. To decode the event, we call,

var data Data
err := event.DataAs(&data)
if err != nil {
log.Printf("Failed to convert data type:%v", err)
return
}

Once we get the event data, call some API to resize the image, and upload it into a different Minio bucket.

Build the docker image with the following Dockerfile,

FROM golang as builder
WORKDIR /build
COPY ./src /build
RUN CGO_ENABLED=0 go build -o serving *.goFROM alpine
WORKDIR /app
COPY --from=builder /build/serving /app
CMD ["./serving"]

Push into the registry, deploy it as a Knative Service.

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: kevent-exp-image-resize
namespace: kevent-exp
spec:
template:
spec:
containers:
- image: docker-registry.default.svc:5000/kevent-exp/kevent-exp-serving-image-resize
env:
- name: MINIO_SERVER
value: "minio.minio.svc:9000"
- name: MINIO_KEY
value: "...SKIPPED..."
- name: MINIO_SECRET
value: "...SKIPPED..."
- name: MINIO_TARGET_BUCKET
value: "resized"

Now create a trigger to consume the event,

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: trigger-for-image-resizing
namespace: kevent-exp
spec:
broker: default
filter:
sourceAndType:
type: dev.knative.eventing.zhimin.minio
source: minio
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: kevent-exp-image-resize

Upload an image to monitor the resized image that appears in the “resized” bucket.

Let’s call Watson Visual Recognition API to tag the image. (The code is skipped)

Once the image classification is done, we reply with another cloud event that will dispatch back to the broker. The new event can be crafted with the following code,

r := cloudevents.NewEvent()
r.SetSource(fmt.Sprintf("https://knative.dev/zhimin/imageClassified"))
r.SetType("dev.knative.eventing.zhimin.image.classify")
r.SetID(event.Context.GetID())
r.SetData(ClassifyResultEventData{
Bucket: data.Bucket,
ImageName: data.Name,
Classifies: tags,
})
resp.RespondWith(200, &r)

A new trigger that filter by the type of dev.knative.eventing.zhimin.image.classify and the source of https://knative.dev/zhimin/imageClassified can be created against the default broker to consume this new event for further processing in sequence.

Build the docker image, push into the registry, and create the Knative service accordingly. (Dockerfile and Ksvc YAML file are skipped)

Create the following trigger to let the event flow to the service.

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: trigger-for-watson-classify
namespace: kevent-exp
spec:
broker: default
filter:
sourceAndType:
type: dev.knative.eventing.zhimin.minio
source: minio
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: kevent-exp-watson-classify

Once the trigger applied, validate the respective subscription is created, and check the detail of it, kubectl get subscriptions default-trigger-for-watson-classify-qk4m5 -o yaml . We will see the reply portion

spec:
channel:
apiVersion: eventing.knative.dev/v1alpha1
kind: Channel
name: default-broker-b95kt
reply:
channel:
apiVersion: eventing.knative.dev/v1alpha1
kind: Channel
name: default-broker-lhdgg
subscriber:
uri: http://default-broker-filter.kevent-exp.svc.cluster.local/triggers/kevent-exp/trigger-for-watson-classify/51d915b1-e845-11e9-b216-06069c3cc452

Test with a new image. Observe the event display log, the new event type and source are sent as expected. The event data has the Watson classified result and it can be consumed by another service.

☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 0.2
type: dev.knative.eventing.zhimin.image.classify
source: https://knative.dev/zhimin/imageClassified
id: a82280da-3510-4a4e-bfc3-f5eac65e5031
time: 2019-10-07T13:28:24.71515201Z
contenttype: application/json
Extensions,
knativehistory: default-broker-b95kt-channel-zjlww.kevent-exp.svc.cluster.local
Data,
{
"Bucket": "images",
"ImageName": "albert-einstein-1144965_1920.jpg",
"Classifies": "person (0.801000),ash grey color (0.751000),black color (0.685000)"
}

I have tried to create a separate channel, say channel-classify-next and subscriptions to form a pipeline to simulate some sequential event processing. However, even update the reply field to the new channel as below,

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: trigger-for-watson-classify
namespace: kevent-exp
spec:
broker: default
filter:
sourceAndType:
type: dev.knative.eventing.zhimin.minio
source: minio
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: kevent-exp-watson-classify
reply:
channel:
apiVersion: eventing.knative.dev/v1alpha1
kind: Channel
name: channel-classify-next

After reconciling, the reply field still points to the default broker. It seems like it’s a restriction of Version 0.60 (?)

Conclusion

Messaging is the cornerstone of traditional enterprise applications. There are many successful patterns developed with it. The Knative Eventing introduces the eventing/messaging concept back to Kubernetes and getting more mature with the fast development pace. This definitely will bring a positive impact on the cloud adoption journey of the enterprises.

Cloud explorer