Spring Integration Java DSL + Activiti

By : Bully
Source: Stackoverflow.com
Question!

I watched video about Spring + Activiti https://m.youtube.com/watch?v=0PV_8Lew3vg. I need to make the process moved immediately to the serviceTask where Activiti invokes my gateway, sending a request message to queue (rabbitMQ). After the request message is sent, the process stops. Then serviceTask starts up again as soon as response message is in the response queue. The serviceTask may take a very long time.

I tried the example from webinar and it is working fine but sync.

This is my ActivitiDemoApplication.java

package com.example;

import java.util.Map;

import javax.sql.DataSource;

import org.activiti.engine.ProcessEngine;
import org.activiti.engine.RuntimeService;
import org.activiti.engine.TaskService;
import org.activiti.engine.impl.pvm.delegate.ActivityExecution;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.spring.integration.ActivitiInboundGateway;
import org.activiti.spring.integration.IntegrationActivityBehavior;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.integration.activiti.gateway.AsyncActivityBehaviorMessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@ComponentScan
// @EnableAutoConfiguration
@SpringBootApplication
public class ActivitiDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivitiDemoApplication.class, args);
    }

    @Bean
    IntegrationActivityBehavior activitiDelegate(
            ActivitiInboundGateway activitiInboundGateway) {
        return new IntegrationActivityBehavior(activitiInboundGateway);
    }

    @Bean
    ActivitiInboundGateway inboundGateway(ProcessEngine processEngine) {
        return new ActivitiInboundGateway(processEngine, "processed");
    }

    @Bean
    IntegrationFlow inboundProcess(
            ActivitiInboundGateway activitiInboundGateway,
            PhotoService photoService) {
        return IntegrationFlows.from(activitiInboundGateway)
                .handle(new GenericHandler<ActivityExecution>() {
                    @Override
                    public Object handle(ActivityExecution execution,
                            Map<String, Object> headers) {
                        try {
                            photoService.Execute();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        return MessageBuilder.withPayload(execution)
                                .setHeader("processed", (Object) true)
                                .copyHeaders(headers).build();
                    }
                }).get();
    }

    @Bean
    public DataSource database() {
        return DataSourceBuilder
                .create()
                .url("jdbc:sqlserver://localhost:1433;databaseName=activiti")
                .username("activiti")
                .password("activiti")
                .driverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
                .build();
    }

    @RestController
    public static class MyRestController {
        @Autowired
        private RuntimeService runtimeService;

        @RequestMapping(value = "/start-my-process", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
        public void startMyProcess() {
            ProcessInstance p = runtimeService
                    .startProcessInstanceByKey("TestProcess");
            System.out.println("id: " + p.getId());
            System.out.println("count: "
                    + runtimeService.createProcessInstanceQuery().count());
        }

    }
}

@Service
@Transactional
class PhotoService {
    @Autowired
    private TaskService taskService;

    public void Execute() throws InterruptedException {
        System.out.println("debug 1");
        Thread.currentThread().sleep(2000);
        System.out.println("debug 2");
    }
}

this is my TestProcess.bmpn20.xml

<?xml version='1.0' encoding='UTF-8'?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"
    xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
    xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
    typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath"
    targetNamespace="http://www.activiti.org/processdef" xmlns:modeler="http://activiti.com/modeler"
    modeler:version="1.0ev" modeler:exportDateTime="20151228174550"
    modeler:modelId="969411" modeler:modelVersion="1"
    modeler:modelLastUpdated="1451324745996">
    <process id="TestProcess" name="TestProcess" isExecutable="true">
        <startEvent id="startEvent1" />
        <intermediateCatchEvent id="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1">
            <timerEventDefinition>
                <timeDuration>PT20S</timeDuration>
            </timerEventDefinition>
        </intermediateCatchEvent>
        <endEvent id="sid-23D49CE8-B018-4ABF-871F-07F42508C98A" />
        <sequenceFlow id="sid-53B3780A-A1F6-4059-9660-D56CECD236F1"
            sourceRef="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1" targetRef="sid-23D49CE8-B018-4ABF-871F-07F42508C98A" />
        <serviceTask id="task-integration" name="task integration"
            activiti:delegateExpression="#{activitiDelegate}" />
        <sequenceFlow id="sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07"
            sourceRef="task-integration" targetRef="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1" />
        <sequenceFlow id="sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7"
            sourceRef="startEvent1" targetRef="task-integration" />
    </process>
    <bpmndi:BPMNDiagram id="BPMNDiagram_TestProcess">
        <bpmndi:BPMNPlane bpmnElement="TestProcess" id="BPMNPlane_TestProcess">
            <bpmndi:BPMNShape bpmnElement="startEvent1"
                id="BPMNShape_startEvent1">
                <omgdc:Bounds height="30.0" width="30.0" x="100.0" y="163.0" />
            </bpmndi:BPMNShape>
            <bpmndi:BPMNShape bpmnElement="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1"
                id="BPMNShape_sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1">
                <omgdc:Bounds height="31.0" width="31.0" x="480.0" y="162.5" />
            </bpmndi:BPMNShape>
            <bpmndi:BPMNShape bpmnElement="sid-23D49CE8-B018-4ABF-871F-07F42508C98A"
                id="BPMNShape_sid-23D49CE8-B018-4ABF-871F-07F42508C98A">
                <omgdc:Bounds height="28.0" width="28.0" x="570.0" y="161.0" />
            </bpmndi:BPMNShape>
            <bpmndi:BPMNShape bpmnElement="task-integration"
                id="BPMNShape_task-integration">
                <omgdc:Bounds height="80.0" width="100.0" x="340.0" y="135.0" />
            </bpmndi:BPMNShape>
            <bpmndi:BPMNEdge bpmnElement="sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7"
                id="BPMNEdge_sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7">
                <omgdi:waypoint x="130.0" y="178.0" />
                <omgdi:waypoint x="162.5" y="178.0" />
                <omgdi:waypoint x="157.0" y="178.0" />
                <omgdi:waypoint x="340.0" y="161.521327014218" />
            </bpmndi:BPMNEdge>
            <bpmndi:BPMNEdge bpmnElement="sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07"
                id="BPMNEdge_sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07">
                <omgdi:waypoint x="440.0" y="176.4218009478673" />
                <omgdi:waypoint x="480.0062629076814" y="177.5594197983227" />
            </bpmndi:BPMNEdge>
            <bpmndi:BPMNEdge bpmnElement="sid-53B3780A-A1F6-4059-9660-D56CECD236F1"
                id="BPMNEdge_sid-53B3780A-A1F6-4059-9660-D56CECD236F1">
                <omgdi:waypoint x="512.0" y="178.5" />
                <omgdi:waypoint x="540.5" y="178.5" />
                <omgdi:waypoint x="540.5" y="175.0" />
                <omgdi:waypoint x="570.0" y="175.0" />
            </bpmndi:BPMNEdge>
        </bpmndi:BPMNPlane>
    </bpmndi:BPMNDiagram>
</definitions>

I would like to make async process.

<serviceTask id="task-integration" name="task integration" activiti:delegateExpression="#{activitiDelegate}"/>

When Activiti invokes my gateway "#{activitiDelegate}", i want to move a request message to a RequestQueue for execution "photoService.Execute()". Then process starts up again when response message is received from ResponseQueue.

return MessageBuilder.withPayload(execution).setHeader("processed", (Object) true).copyHeaders(headers).build();

I do not know how can i implement amqp with gateway

    @Bean
    IntegrationActivityBehavior activitiDelegate(
            ActivitiInboundGateway activitiInboundGateway) {
        return new IntegrationActivityBehavior(activitiInboundGateway);
    }

    @Bean
    ActivitiInboundGateway inboundGateway(ProcessEngine processEngine) {
        return new ActivitiInboundGateway(processEngine, "processed");
    }

    @Bean
    IntegrationFlow inboundProcess(
            ActivitiInboundGateway activitiInboundGateway,
            PhotoService photoService) {
        return IntegrationFlows.from(activitiInboundGateway)
                .handle(new GenericHandler<ActivityExecution>() {
                    @Override
                    public Object handle(ActivityExecution execution,
                            Map<String, Object> headers) {
                        try {
                            photoService.Execute();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        return MessageBuilder.withPayload(execution)
                                .setHeader("processed", (Object) true)
                                .copyHeaders(headers).build();
                    }
                }).get();
    }

I am sorry, but i am begginer in java spring integration. I hope that my explain is clear.

By : Bully


Answers

Sorry, your question isn't clear, especially for those who isn't familiar with one of the mentioned Frameworks.

You say serviceTask, but don't show it in the code. I only can guess that you talk about that photoService, which isn't called in the IntegrationFlow below anyway. That confuses more, too.

What I only can say that ActivitiInboundGateway is blocked component (see ActivitiInboundGateway.execute()):

Message


This video can help you solving your question :)
By: admin