6

JMS Throttling With Camunda Application

 2 years ago
source link: https://dzone.com/articles/jms-throttling-with-camunda-application
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

IBM MQ exposes the JMS interface to connect with Camunda Spring boot using the mq-jms-spring-boot-starterlibrary as a dependency in the POM.xml file. The message being delivered to Camunda is modeled as XML- SOAP format and posted on queue through MQJ explorer. Here, MQJ explorer acts as a client to IBM MQ. We will have methods to start and stop the JMS listener, as well as methods to know the status and setting maxConnectionSize.

Prerequisites

  • Eclipse (any version) with Maven capabilities
  • Java 8+
  • IBM MQ and MQJ Explorer
  • Camunda

Installing Eclipse-IDE on Windows

Creating a Maven Project in Eclipse IDE

  1. Open the Eclipse IDE.
  2. Go to File  > New > Project.
    Screenshot of new project in file
  3. Go to Maven -> Maven Project and click Next.
    New maven project screenshot
  4. Select your workspace location and click Next.
    Screenshot to select workspace location
  5. Select quick start maven archetype and click Next.
    Quick start maven screenshot
  6. Enter Group Id, Artifact Id, and package name.

Maven project setup screenshot

Group Id: Fill in a groupId for the project of your choice.

Artifact Id: Fill artifactId for the project of your choice

Package: Java package structure of your choice

The above process will create a project structure like below:

JMSCamundaThrottleDemo file structure

7. Create a package like com.example.demo.delegate under src/main/java folder and create a source folder src/main/resources folder.

8. Place the CamundaApplication.java file in the com.example.demo package. 

package com.example.demo;


import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;

import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.ProcessEngines;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.variable.Variables;
import org.camunda.bpm.engine.variable.value.ObjectValue;
import org.camunda.bpm.spring.boot.starter.annotation.EnableProcessApplication;
import org.camunda.bpm.spring.boot.starter.event.PostDeployEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;



@ComponentScan("com.example.demo")
@SpringBootApplication(scanBasePackages = "com.example.demo.*",exclude={JmsAutoConfiguration.class})
@EnableProcessApplication()
@EnableJms
public class CamundaApplication {

	

	Object lock = new Object();
	boolean firstTimeP = true;
	

	@Autowired
	ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();

	public static void main(String[] args) {
		SpringApplication.run(CamundaApplication.class, args);

	}

	
	String xmlRequestPublishBilling = null;
	

	@EventListener
	public void onPostDeploy(PostDeployEvent event) {

		synchronized (lock) {
			lock.notifyAll();
			
			firstTimeP = false;
			
		}

	}

	@JmsListener(destination = "${ibm.mq.queue.publishBilling}")
	public void listenerPublish(Object message) {

		if (firstTimeP) {
					synchronized (lock) {
				try {
					
					lock.wait();

				} catch (InterruptedException e) {
					e.printStackTrace();

				}
			}
		}

		try {
			if (message instanceof com.ibm.jms.JMSBytesMessage) {
				com.ibm.jms.JMSBytesMessage mess = (com.ibm.jms.JMSBytesMessage) message;
				mess.acknowledge();
				byte[] payload = new byte[(int) mess.getBodyLength()];
				mess.readBytes(payload);
				xmlRequestPublishBilling = new String(payload);
			} else {
				com.ibm.jms.JMSTextMessage mess = (com.ibm.jms.JMSTextMessage) message;
				xmlRequestPublishBilling = mess.getText();
				mess.acknowledge();

			}

			String transactionId = null;
			byte[] byteArray;
			byteArray = xmlRequestPublishBilling.getBytes("UTF-8");
			ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray);
			XMLInputFactory inputFactory = XMLInputFactory.newInstance();
			XMLStreamReader streamReader = inputFactory.createXMLStreamReader(inputStream);

			while (streamReader.hasNext()) {
				// Move to next event
				streamReader.next();

				// Check if its 'START_ELEMENT'
				if (streamReader.getEventType() == XMLStreamReader.START_ELEMENT) {

					String tagName = streamReader.getLocalName();
					if (tagName.equalsIgnoreCase("transactionId")) {

						transactionId = streamReader.getElementText();
						

					}
				}

			}

			RuntimeService runtimeService = engine.getRuntimeService();
			Map<String, Object> processVariableMap = new HashMap<String, Object>();
			ObjectValue xmlValue = Variables.objectValue(xmlRequestPublishBilling)
					.serializationDataFormat("application/xml").create();
			processVariableMap.put(Constant.REQUEST, xmlValue);
			

			runtimeService.createMessageCorrelation("RecpSrvc_Billing_Initiator_Message")
					.processInstanceBusinessKey(transactionId).setVariables(processVariableMap).correlateStartMessage();

		} catch (Exception e) {

			e.printStackTrace();

		} 

	}



}

9. Add the Constant.java file in the com.example.demo package.

package com.example.demo;

public class Constant {

	// ****** Error State Constants*****
	public static final String REQUEST = "request";

}

10. Add O2JMSController.java file in com.example.demo package.

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQSimpleConnectionManager;

@RestController
@RequestMapping("/jms")
public class O2JMSController {
	private static final Logger LOGGER = LoggerFactory.getLogger(O2JMSController.class);

	@Autowired
	JmsListenerEndpointRegistry jmsListenerEndpointRegistry;

	static MQSimpleConnectionManager connectionMgr = null;
	static int size = 40;

	@RequestMapping(value = "/publishBilling/stop", method = RequestMethod.POST)
	String stopPublishBilling(@RequestBody String command) {

		MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");
		container.stop();

		return "PublishBilling JMS Listener Stopped";

	}

	@RequestMapping(value = "/publishBilling/start", method = RequestMethod.POST)
	String startPublishBilling(@RequestBody String command) {

		MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");
		container.start();

		return "PublishBilling JMS Listener Started";

	}

	@RequestMapping(value = "/publishBilling/status", method = RequestMethod.POST)
	String getStatusPublishBilling(@RequestBody String command) {

		MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");

		boolean status = container.isRunning();
		if (status) {

			return "PublishBilling JMS Listener Running";
		} else {

			return "PublishBilling JMS Listener not Running";
		}
	}

	@RequestMapping(value = "/maxConnectionSize", method = RequestMethod.POST)
	String setMaxConnections(@RequestBody String connectionSize) {

		size = Integer.parseInt(connectionSize);
		stopPublishBilling(null);

		if (connectionMgr == null) {

			connectionMgr = new MQSimpleConnectionManager();

		}
		connectionMgr.setActive(MQSimpleConnectionManager.MODE_AUTO);

		connectionMgr.setMaxConnections(size);

		MQEnvironment.setDefaultConnectionManager(connectionMgr);

		startPublishBilling(null);

		return "maxConnections is set :: " + size;
	}

	@RequestMapping(value = "/maxConnectionSize", method = RequestMethod.GET)
	String getMaxConnections() {

		// int connectionSize = 0;

		return "maxConnection size is :: " + size;
	}

}

11. Add the BillingDelegate.java file in com.example.demo.delegate package.

package com.example.dem.delegate;

import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.camunda.bpm.engine.variable.value.ObjectValue;
import org.springframework.stereotype.Component;

import com.example.demo.Constant;

@Component("BillingDelegate")
public class BillingDelegate implements JavaDelegate {

	public void execute(DelegateExecution execution) {

		ObjectValue xmlRequestObj = (ObjectValue) execution.getVariableTyped(Constant.REQUEST);

		String xmlRequest = (String) xmlRequestObj.getValue().toString();

		System.out.println("Request :" + xmlRequest);

	}
}

12. Add application.properties, application.yaml, and RcsBillingEventInitiator.bpmn in /src/main/resources folder.

application.properties

                        ibm.mq.queueManager=M0DCRMT3

ibm.mq.channel=BOSS.SVRCONN

ibm.mq.connName=iv4239.uname.telecom.co.nz(1434)

ibm.mq.user=

ibm.mq.password=

ibm.mq.pool.enabled=true

ibm.mq.pool.maxConnections=40

ibm.mq.pool.maxSessionsPerConnection=500

ibm.mq.queue.publishBilling=DEV/BOSS/PUBLISH_BILLING_EVENT

server.port=8080

application.yaml

spring.h2.console.enabled: true

spring.datasource:

url: jdbc:h2:./camunda-h2-database;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;AUTO_SERVER=TRUE;

username: sa

password: sa

camunda.bpm:

  admin-user:

                    id: demo

                    password: demo

                    firstName: Demo

                    lastName: Demo

              filter:

                    create: All TasksReceptor Service Billing Event Initiator

RcsBillingEventInitiator.bpmn

	<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" id="Definitions_0xx4klu" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="4.4.0">
  <bpmn:collaboration id="Collaboration_1g0hs7s">
    <bpmn:participant id="RCS_BillingEvent_Initiator" name="Receptor Service Billing Event Initiator" processRef="RCS_BillingEvent_Initiator_Group" />
  </bpmn:collaboration>
  <bpmn:process id="RCS_BillingEvent_Initiator_Group" isExecutable="true">
    <bpmn:startEvent id="RecpSrvc_Billing_Initiator_Start" name="RecpSrvc Billing Initiator Start">
      <bpmn:outgoing>Flow_1jexk4p</bpmn:outgoing>
      <bpmn:messageEventDefinition id="MessageEventDefinition_0qqrwpx" messageRef="Message_1suodic" />
    </bpmn:startEvent>
    <bpmn:sequenceFlow id="Flow_1jexk4p" sourceRef="RecpSrvc_Billing_Initiator_Start" targetRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" />
    <bpmn:endEvent id="Event_1tqof64" name="End Eent">
      <bpmn:incoming>Flow_06bsz4a</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:sequenceFlow id="Flow_06bsz4a" sourceRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" targetRef="Event_1tqof64" />
    <bpmn:serviceTask id="RecpSrvc_Billing_Initiator_DuplicateEventValidation" name="Validating the Received message" camunda:delegateExpression="#{BillingDelegate}">
      <bpmn:incoming>Flow_1jexk4p</bpmn:incoming>
      <bpmn:outgoing>Flow_06bsz4a</bpmn:outgoing>
    </bpmn:serviceTask>
    <bpmn:textAnnotation id="TextAnnotation_0kpt6hp">
      <bpmn:text>Receives publish billing event from MQ series</bpmn:text>
    </bpmn:textAnnotation>
    <bpmn:association id="Association_0ee69pk" sourceRef="RecpSrvc_Billing_Initiator_Start" targetRef="TextAnnotation_0kpt6hp" />
    <bpmn:textAnnotation id="TextAnnotation_1k38r2c">
      <bpmn:text>Validating the received XML</bpmn:text>
    </bpmn:textAnnotation>
    <bpmn:association id="Association_1kyku2l" sourceRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" targetRef="TextAnnotation_1k38r2c" />
    <bpmn:textAnnotation id="TextAnnotation_1qzlsj7">
      <bpmn:text>Ending the flow</bpmn:text>
    </bpmn:textAnnotation>
    <bpmn:association id="Association_1my84xb" sourceRef="Event_1tqof64" targetRef="TextAnnotation_1qzlsj7" />
  </bpmn:process>
  <bpmn:message id="Message_1suodic" name="RecpSrvc_Billing_Initiator_Message" />
  <bpmn:message id="Message_1vnhkcs" name="RecpSrvc_Billing_Pre_Processor_Msg" />
  <bpmn:error id="Error_14k81br" name="BPMN_ERROR" errorCode="BPMN_ERROR" />
  <bpmndi:BPMNDiagram id="BPMNDiagram_1">
    <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_1g0hs7s">
      <bpmndi:BPMNShape id="Participant_07448el_di" bpmnElement="RCS_BillingEvent_Initiator" isHorizontal="true">
        <dc:Bounds x="160" y="80" width="670" height="350" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="TextAnnotation_1k38r2c_di" bpmnElement="TextAnnotation_1k38r2c">
        <dc:Bounds x="530" y="150" width="100" height="40" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="TextAnnotation_1qzlsj7_di" bpmnElement="TextAnnotation_1qzlsj7">
        <dc:Bounds x="710" y="220" width="100" height="30" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="Flow_1jexk4p_di" bpmnElement="Flow_1jexk4p">
        <di:waypoint x="246" y="327" />
        <di:waypoint x="410" y="327" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_06bsz4a_di" bpmnElement="Flow_06bsz4a">
        <di:waypoint x="510" y="327" />
        <di:waypoint x="672" y="327" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNShape id="Event_1btznrj_di" bpmnElement="RecpSrvc_Billing_Initiator_Start">
        <dc:Bounds x="210" y="309" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="190" y="352" width="81" height="27" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="TextAnnotation_0kpt6hp_di" bpmnElement="TextAnnotation_0kpt6hp">
        <dc:Bounds x="230" y="150" width="100" height="68" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_1tqof64_di" bpmnElement="Event_1tqof64">
        <dc:Bounds x="672" y="309" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="667" y="352" width="47" height="14" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Activity_1efnk0s_di" bpmnElement="RecpSrvc_Billing_Initiator_DuplicateEventValidation">
        <dc:Bounds x="410" y="287" width="100" height="80" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="Association_0ee69pk_di" bpmnElement="Association_0ee69pk">
        <di:waypoint x="234" y="310" />
        <di:waypoint x="263" y="218" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Association_1kyku2l_di" bpmnElement="Association_1kyku2l">
        <di:waypoint x="484" y="287" />
        <di:waypoint x="541" y="190" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Association_1my84xb_di" bpmnElement="Association_1my84xb">
        <di:waypoint x="701" y="313" />
        <di:waypoint x="749" y="250" />
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</bpmn:definitions>

13. Replace the pom.xml with the below content.

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	  <groupId>JMSCamundaThrottleDemo</groupId>
  <artifactId>JMSCamundaThrottleDemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <name>JMSCamundaThrottleDemo</name>
  <description>JMSCamundaThrottleDemo</description>
	
	 <properties>
    <camunda.version>7.14.0</camunda.version>
    <cxf.version>3.3.6</cxf.version>

    <camundaSpringBoot.version>7.14.0</camundaSpringBoot.version>
    <springBoot.version>2.2.5.RELEASE</springBoot.version>

    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <version.java>1.8</version.java>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <failOnMissingWebXml>false</failOnMissingWebXml>
  </properties>


	<dependencies>
        <dependency>
            <groupId>org.camunda.bpm.springboot</groupId>
            <artifactId>camunda-bpm-spring-boot-starter-webapp</artifactId>
                <version>7.16.0-alpha2</version>
        </dependency>
        <dependency>
			<groupId>org.camunda.bpm.springboot</groupId>
			<artifactId>camunda-bpm-spring-boot-starter-rest</artifactId>
			<version>7.16.0-alpha2</version>
		</dependency>
				<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			 <version>5.3.8</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			 <version>2.12.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
			   <version>4.5.13</version>
		</dependency>
		<dependency>
			<groupId>org.camunda.spin</groupId>
			<artifactId>camunda-spin-dataformat-all</artifactId>
			<version>1.10.1</version>
		</dependency>
		<dependency>
			<groupId>org.camunda.bpm</groupId>
			<artifactId>camunda-engine-plugin-spin</artifactId>
			<version>7.16.0-alpha2</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
			<version>2.5.1</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
			<version>2.5.1</version>
		</dependency>
		<dependency>
			<groupId>org.camunda.spin</groupId>
			<artifactId>camunda-spin-core</artifactId>
			 <version>1.10.1</version>
		</dependency>


		<!-- PostgreSQL -->
		<dependency>
			<groupId>org.postgresql</groupId>
			<artifactId>postgresql</artifactId>
			 <version>42.2.22</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			 <version>2.5.1</version>
		</dependency>
		
		<dependency>
			<groupId>com.ibm.mq</groupId>
			<artifactId>mq-jms-spring-boot-starter</artifactId>
			<version>2.4.1</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.camunda.bpm/camunda-engine -->

	
		<dependency>
			<groupId>io.jaegertracing</groupId>
			<artifactId>jaeger-client</artifactId>
			<version>1.6.0</version>
		</dependency>
		<dependency>
			<groupId>io.opentracing.contrib</groupId>
			<artifactId>opentracing-spring-jaeger-cloud-starter</artifactId>
			<version>3.3.1</version>
		</dependency>
		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
			 <version>1.4.200</version>
		</dependency>
		
		<dependency>
			<groupId>net.logstash.logback</groupId>
			<artifactId>logstash-logback-encoder</artifactId>
			 <version>6.6</version>
		</dependency>

	
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>5.3.8</version>
		</dependency>


	</dependencies>
	
  <repositories>
     <repository>
      <id>camunda-bpm-nexus</id>
      <name>Camunda Maven Repository</name>
      <url>https://app.camunda.com/nexus/content/groups/public</url>
    </repository>
    <!-- enable this for EE dependencies (requires credentials in ~/.m2/settings.xml)-->
<!--     <repository>
      <id>camunda-bpm-nexus-ee</id>
      <name>Camunda Enterprise Maven Repository</name>
      <url>https://app.camunda.com/nexus/content/repositories/camunda-bpm</url>
    </repository> -->
    
  </repositories>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				 <version>2.5.1</version>
			</plugin>
			<plugin>
				<groupId>com.google.cloud.tools</groupId>
				<artifactId>jib-maven-plugin</artifactId>
				<version>0.4.0</version>
			</plugin>

		</plugins>
	</build>
</project>

Testing

  1. Run the Camunda Application.
  2. Starting the JMS listener:

Starting the JMS listener

3. Stopping the JMS listener:

Stopping JMS listener

4. Knowing the status:Know JMS listener status

5. Setting maxConnectionSize:

Setting maxconnectionsize

This concludes what we have learned about JMS throttling with Camunda Application.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK