In this workshop we will use Java & Spring Cloud Stream (SCS) to create Event-Driven Applications with PubSub+ in PWS.
After creating a few by hand we will see how the AsyncAPI specification can be used to define your event-driven microservices and even generate a microservice!
The recommended IDE for the workshop is Spring Tools Suite (STS) Download Here. STS comes with some niceties, such as autodeploy, when used with spring-boot-devtools. Participants can of course use another IDE if preferred. It is also recommended that you begin the workshop with an empty STS workspace to avoid any unforeseen issues with existing projects/configurations/code.
Required libraries:
$ git clone https://github.com/Mrc0113/workshop-scs-s1p.git
$ git clone git@github.com:Mrc0113/workshop-scs-s1p.git
https://github.com/Mrc0113/workshop-scs-s1p
, click "Clone or download" -> "Download ZIP" & unzip in your desired directoryAfter importing everything you should see the following projects in STS:
$ cd ~/git/solace-workshop-scs/01-scs-workshop-common/
$ mvn clean install
If you want to stand up your Solace PubSub+ Service in Solace Cloud go ahead and login or signup at the Cloud Signup Page. Note that a free tier is available and will work for this workshop. THIS IS THE PREFERRED OPTION FOR THIS WORKSHOP
When developing your application, you may want to test using a local instance of the Solace PubSub+ Event Broker. Refer to the Solace Docker Getting Started Guide to get you up and running quickly with a broker instance running in Docker. IF CHOOSING THIS OPTION YOU WILL HAVE TO RUN YOUR APPS LOCALLY TO USE THE LOCAL BROKER
At the end of this section we will have created the apps below!
The Source will send out tweets that will be received by the marketing Sink.
Before our company can do anything with the tweets we have to start to receive an incoming stream of them! Let's get started! Please navigate to the "02-scs-source-tweets" project in your IDE.
Before we take a look at the code, let's take a quick look at the structure of a Spring Cloud Streams project.
$ mvn spring-boot:run
To do this we will deploy a sink app. Recall that a sink app binds to an INPUT channel.
<ATTENDEE_NAME>
group with your usernameSTS provides integrated support for deploying, running and debugging your SCS services in PWS. In the Boot Dashboard view, configure a connection to your PWS deployment by clicking the "+" button as seen in the image below.
Follow the dialog prompts and fill in the username / password associated with your PWS account.
$ cf login -a <API_URL> -u <USERNAME>
$ Password>
Stop the source & sink that you currently have running locally as we are going to deploy them to PWS using the same PubSub+ instance and want to avoid duplicate messages.
console
view.ctrl-c
to stop the Spring Boot process.<ATTENDEE_NAME>
to your username; this is to ensure your application name is unique from other workshop attendees.$ mvn clean install
$ cf push
$ cf logs 02-scs-source-tweets-<ATTENDEE_NAME>
<ATTENDEE_NAME>
to your username; this is to ensure your application name is unique from other workshop attendees.$ mvn clean install
$ cf push
$ cf logs 03-scs-sink-analytics-<ATTENDEE_NAME>
At the end of this section we will have added the Factory Tweet Board Sink.
We obviously don't have a giant LED board that we can use so we're going to settle for logging the tweets as they come in.
<ATTENDEE_NAME>
with your username (e.g: TWEETS.Q.BOARD.User1). Note that by not specifying a group we are using the "Publish-Subscribe" messaging model.<ATTENDEE_NAME>
to your nameAt this point we have created our "04-scs-sink-tweetboard" application and it needs to be deployed.
$ mvn clean install
$ cf push
$ cf logs 04-scs-sink-tweetboard-<ATTENDEE_NAME>
So far in this workshop we have created source or sink applications. In this section we will create our first processor.
In order to meet our new goal we will add the Features processor and a new Sink as seen below.
<ATTENDEE_NAME>
to be your name each time it occurs in the application.yaml file.<ATTENDEE_NAME>
in the manifest.yml file!)05-scs-processor-feature-<ATTENDEE_NAME>
, then right click and choose stop
cf stop 05-scs-processor-feature-<ATTENDEE_NAME>
command while of course switching out <ATTENDEE_NAME>
for your username<ATTENDEE_NAME>
with your username<ATTENDEE_NAME>
to your username & add your PubSub+ connection details<ATTENDEE_NAME>
with your username.<ATTENDEE_NAME>
with your username in BOTH spots<ATTENDEE_NAME>
in the manifest.yml!)Note that our processor that we created earlier in this lab publishes to multiple topics essentially splitting our feed into two. Due to our new requirements to not show new features on the twitter board we need to update that sink appropriately.
<ATTENDEE_NAME>
/nofeatures"$ cd /path/to/04-scs-sink-tweetboard
$ mvn install
$ cf push
In this section we are going to take a glimpse into the future of Event-Driven microservices by using AsyncAPI to generate a SCS Processor microservice. AsyncAPI is the industry standard for defining asynchronous APIs - more info can be found on the specification website..
We're going to add a "No Yelling" processor in our event driven architecture in order to meet this new need.
Use https
$ git clone https://github.com/jschabowsky/AsyncAPI-Spring-Cloud-Streams-Generator.git
OR Use SSH
$ git clone git@github.com:jschabowsky/AsyncAPI-Spring-Cloud-Streams-Generator.git
OR Navigate to https://github.com/jschabowsky/AsyncAPI-Spring-Cloud-Streams-Generator
, click "Clone or download" -> "Download ZIP" & unzip in your desired directory
This is an example AsyncAPI Specification file that defines our event-driven microservice.
It specifies information about the application such as servers where you can interact with the microservice, channels which messages are exchanged over, and components such as the messages that are expected and the schemas which define them.
In the git repo that you cloned you'll find a 08-yellingArtifacts
directory. Navigate to that directory (keep in mind this directory is not a maven project so it wouldn't have been imported into your IDE as one) and you should find a file called NoYellingProcessor.yaml
Open the file up and take a look at how the microservice is defined using the AsyncAPI specification.
Now that you have cloned the necessary artifacts let's go ahead and generate the project skeleton!
$ cd /path/to/AsyncAPI-SpringCloudStreams-Generator
$ mvn clean install
$ java -jar target/AsyncAPI-SpringCloudStreams-Generator-0.0.1-SNAPSHOT-spring-boot.jar --reactive --p="com.solace.spring.cloud.streams.test" --cu=solace-cloud-client --cp=default --mvpn=default /path/to/workshop-scs-s1p/08-yellingArtifacts/NoYellingProcessor.yaml
Your Project Has Been Generated at: initializr/tmp4062230108469339772
Open the ScsprocessoryellingApplication class
Add the spring.cloud.stream.function.definition
argument to the SpringApplication.run
command in the main
method as seen below. This specifies which functional bean to bind to the external destination(s) exposed by the bindings.
SpringApplication.run(ScsprocessoryellingApplication.class, "--spring.cloud.stream.function.definition=handleInboundTweet");
Update the handleInboundTweet method to change uppercase letters to lowercae letters in the tweet text using the reactive programming model; an example of this can be seen in the code snippet below.
Note that although we still have the @EnableBinding(Processor.class) annotation we are now binding a bean of type "java.util.function.Function" to the external destinations exposed by the bindings by providing the spring.cloud.stream.function.definition property.
package com.solace.spring.cloud.streams.test;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import com.solace.spring.cloud.streams.test.types.Tweet;
import reactor.core.publisher.Flux;
@SpringBootApplication
@EnableBinding(Processor.class)
public class ScsprocessoryellingApplication {
private static final Logger logger = LoggerFactory.getLogger(ScsprocessoryellingApplication.class);
public static void main(String[] args) {
// Defining the reactive function to bind to the INPUT channel of the Processor
SpringApplication.run(ScsprocessoryellingApplication.class, "--spring.cloud.stream.function.definition=handleInboundTweet");
}
@Bean
public Function<Flux<Tweet>, Flux<Tweet>> handleInboundTweet() {
return flux -> flux
.doOnNext(t ->logger.info("====Tweet BEFORE mapping: " + t.toString()))
.map(t -> { t.setText(t.getText().toLowerCase());
return t;
})
.doOnNext(t ->logger.info("++++Tweet AFTER mapping: " + t.toString()))
;
}
At this point our microservice is ready to run, but in order to deploy to PWS we need to create a manifest.yml file at the top level of the project.
Create a manifest.yml
file with the contents below - make sure to change <ATTENDEE_NAME>
to your username.
---
applications:
- name: scsprocessoryelling-<ATTENDEE_NAME>
memory: 1024M
path: target/scsprocessoryelling-0.0.1-SNAPSHOT.jar
At this point we have created our no yelling microservice and it needs to be deployed.
$ mvn clean install
$ cf push
$ cf logs scsprocessoryelling-<ATTENDEE_NAME>
Pretty cool huh? With AsyncAPI & the Spring Cloud Stream generator all you had to do was enter your business logic and your event-driven microservice was ready to go!
A processor will be added to our architecture in order to convert negative words to positive ones.
Let's get started and hopefully have a bit of fun!
<ATTENDEE_NAME>
to your name<ATTENDEE_NAME>
placeholders with your name (in the input group, output destination & queueAdditionalSubscriptions)<ATTENDEE_NAME>
/nofeatures/noyelling/positive" replacing <ATTENDEE_NAME>
with your name$ cd /path/to/app
$ mvn install
$ cf push
To meet this new requirement we are going to add the MQTT Web App shown in the diagram below:
$ cf app 10-spring-boot-mqttwebapp
This course was just an introduction to Spring Cloud Streams, but we've included some resources below if you're interested in learning more about it or some of the features that complement it! Happy Learning :)