Setting up knative eventing
source link: https://willschenk.com/articles/2021/setting_up_knative_eventing/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
lets send some messages
Published December 13, 2021 #knative, #kubernetes, #eventing
Here's a walk through of how to setup knative eventing. I couldn't find this information in one place and it took longer than I expected to sort it out.
There are a couple of ways you can use eventing. One is to sent messages directly to specific services. This I outline walk through as a direct event.
Another is to use a broker and a trigger. A broker is basically the place you send stuff to, and then you define specific listeners by creating a trigger.
A third way it to use a channel and then create a subscription to that channel.
What's the differene?
Brokers/Trigger and Channels/Subscriptions are very very similar. The code that you use to send an event, in knative parlance, an event source is exactly the same, and the code you use to receive an event, or an event sink is exactly the same, the difference is in the deployment configuration and what the underlying service is that provides the event routing.
Note that the sources that are described in the knative documentation
are the default ones, but writing your own is as simple as creating a
binding
(which gives you a K_SINK
environment variable which will
target your event), so it doesn't need to be all that complicated.
Also a bit confusing, is that brokers are themselves sinks, as well as sources.
With brokers and triggers, you can setup event type filtering on the trigger itself so that your service will only see messages of a specific type.
For channels I think you need to to the filtering on the receiving side.
Install knative-eventing
Since we've setup the knative operator previously, lets tell it to turn on eventing.
eventing.yaml
apiVersion: v1
kind: Namespace
metadata:
name: knative-eventing
---
apiVersion: operator.knative.dev/v1alpha1
kind: KnativeEventing
metadata:
name: knative-eventing
namespace: knative-eventing
And then apply:
kubectl apply -f eventing.yaml
namespace/knative-eventing configured knativeeventing.operator.knative.dev/knative-eventing unchanged
Check the status of the deployment
kubectl get deployment -n knative-eventing
Deploy the test services
Create the echo service
(Code is below)
kn service create ruby-echo --image wschenk/ruby-echo -a networking.knative.dev/disableAutoTLS=true
Create the send service
(Code is below)
kn service create ruby-send --image wschenk/ruby-send -a networking.knative.dev/disableAutoTLS=true
Receiving events
Direct event test
Create the source
This created an event source that sends an event every minute. We
tell it to send the message directly to ruby-echo
, the service that we
just created.
kn source ping create heartbeat-source \
--schedule "*/1 * * * *" \
--sink ksvc:ruby-echo
Ping source 'heartbeat-source' created in namespace 'default'.
View results
This is deployed on my domain, yours should be different.
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby eco service {"specversion"=>"1.0", "id"=>"0dd72dfa-eac9-40a1-b48b-00df271cd62e", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:50:00.060257784Z"}
Cleanup
kn source ping delete heartbeat-source
Ping source 'heartbeat-source' deleted in namespace 'default'.
Broker and Trigger test
Create the Broker
kn broker create default
Broker 'default' successfully created in namespace 'default'.
Create the source
Here we are using the ping source again, but pointing it to
broker:default
instead of the service.
kn source ping create heartbeat-source \
--schedule "*/1 * * * *" \
--sink broker:default
Ping source 'heartbeat-source' created in namespace 'default'.
Create the trigger
Now that we are pinging the broker, we need to tell the broker to send
that message type to our service, ruby-echo
.
kn trigger create heartbeat-trigger \
--broker default \
--filter type=dev.knative.sources.ping \
--sink ksvc:ruby-echo
Trigger 'heartbeat-trigger' successfully created in namespace 'default'.
View results
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service {"specversion"=>"1.0", "id"=>"1aef1313-e8ea-4b14-9d91-691f6ecbcc03", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:57:00.319157126Z", "knativearrivaltime"=>"2021-12-14T18:57:00.319736536Z"}
Cleanup
kn broker delete default
kn source ping delete heartbeat-source
kn trigger delete heartbeat-trigger
Broker 'default' successfully deleted in namespace 'default'. Ping source 'heartbeat-source' deleted in namespace 'default'. Trigger 'heartbeat-trigger' deleted in namespace 'default'.
Channel test
Channel
Simple.
kn channel create heartbeat
Channel 'heartbeat' created in namespace 'default'.
Ping the heartbeat channel
This is sending to channel:heartbeat
kn source ping create heartbeat-source \
--schedule "*/1 * * * *" \
--sink channel:heartbeat
Ping source 'heartbeat-source' created in namespace 'default'.
Subscribe to the event
Here we create the subscription, which goes to our service.
kn subscription create heartbeat-sub \
--channel heartbeat \
--sink ruby-echo
Subscription 'heartbeat-sub' created in namespace 'default'.
See the results
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service {"specversion"=>"1.0", "id"=>"88171ffc-3008-4df4-98cd-c157ef5a1aea", "source"=>"/apis/v1/namespaces/default/pingsources/heartbeat-source", "type"=>"dev.knative.sources.ping", "data_encoded"=>"", "data"=>"", "time"=>"2021-12-14T18:59:00.082372715Z"}
Cleanup
kn channel delete heartbeat
kn source ping delete heartbeat-source
kn subscription delete heartbeat-sub
Channel 'heartbeat' deleted in namespace 'default'. Ping source 'heartbeat-source' deleted in namespace 'default'. Subscription 'heartbeat-sub' deleted in namespace 'default'.
Sending events
Now we can look at creating and sending events. The receiving side is
the same, but we need to create a service that will send a message.
In each scenario this will be the same code, but depending upon how we
deploy it, it will have a different K_SINK
value in the environment.
Direct binding
Create direct binding
kn source binding create direct-binding \
--subject Service:serving.knative.dev/v1:ruby-send \
--sink ruby-echo
Sink binding 'direct-binding' created in namespace 'default'.
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service {"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json"}
Clean up
kn source binding delete direct-binding
Through a broker
Create a broker
kn broker create default
Create the trigger
kn trigger create send-trigger \
--broker default \
--filter type=com.example.someevent \
--sink ksvc:ruby-echo
Trigger 'send-trigger' successfully created in namespace 'default'.
Create the binding
kn source binding create broker-binding \
--subject Service:serving.knative.dev/v1:ruby-send \
--sink broker:default
Sink binding 'broker-binding' created in namespace 'default'.
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service {"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json", "knativearrivaltime"=>"2021-12-14T23:44:09.238452314Z"}
Cleanup
kn trigger delete send-trigger
kn source binding delete broker-binding
Trigger 'send-trigger' deleted in namespace 'default'. Sink binding 'broker-binding' deleted in namespace 'default'.
On a channel
Create channel
Simple.
kn channel create chatter
Create source binding to the channel
kn source binding create chat-binding \
--subject Service:serving.knative.dev/v1:ruby-send \
--sink channel:chatter
Subscribe to the event
Here we create the subscription, which goes to our service.
kn subscription create message-sub \
--channel chatter \
--sink ruby-echo
Subscription 'message-sub' created in namespace 'default'.
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service
curl http://ruby-send.default.gitgratitude.com?message=Hello
sent
curl http://ruby-echo.default.gitgratitude.com
Hello from the ruby echo service {"specversion"=>"1.0", "id"=>"1234-1234-1234", "source"=>"/mycontext", "type"=>"com.example.someevent", "data_encoded"=>"{\"message\":\"Hello\"}", "data"=>{"message"=>"Hello"}, "datacontenttype"=>"application/json"}
Cleanup
kn channel delete chatter
kn source binding delete chat-binding
kn subscription delete message-sub
ruby-echo
require 'sinatra'
require "cloud_events"
class EventHolder
def self.add_event( e )
@events ||= []
@events << e.to_h
end
def self.eventstring
@events ||= []
@events.join( "\n" )
end
end
get '/' do
"Hello from the ruby echo service\n#{EventHolder.eventstring}\n"
end
cloud_events_http = CloudEvents::HttpBinding.default
post "/" do
event = cloud_events_http.decode_event request.env
logger.info "Received CloudEvent: #{event.to_h}"
EventHolder.add_event( event )
end
ruby-send
require 'sinatra'
require "cloud_events"
require "net/http"
require "uri"
def send_message( data )
event = CloudEvents::Event.create spec_version: "1.0",
id: "1234-1234-1234",
source: "/mycontext",
type: "com.example.someevent",
data_content_type: "application/json",
data: data
cloud_events_http = CloudEvents::HttpBinding.default
headers, body = cloud_events_http.encode_event event
Net::HTTP.post URI(ENV['K_SINK']), body, headers
end
get '/' do
if params[:message]
if ENV['K_SINK']
send_message( {message: params[:message] })
return "sent"
else
return "K_SINK not defined"
end
else
return 'Try passing in a message'
end
end
get '/keys' do
ENV.keys.collect { |key| "#{key}=#{ENV[key]}" }.join( "\n" )
end
References
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK