Keep informed?
Subscribe for our newsletter now!

jbpm deployer – Status and Sourcecode

I got a lot interesting feedback about my ideas around the jbpm.deployer (see jbpm deployer for jbpm in enterprise environments), and I promised to write I a bit more about it. So I will deliver today…

Basically a lot of people confirmed, that the jbpm.deployer is a useful feature for them. Some of them even requested the code, so I guess they really want to use it 🙂 And from a few projects I saw in the past I know from the past also build something quite similar to it.

One further information: Last week was jbpm team meeting in Antwerp. I could join that for at least one day. There we also discussed direction of jbpm 4. In the context of running jbpm in the application server or ESB we concluded, that the direction I sketched here is a good way to go. So the ideas described here will somehow find its way in jbpm 4 and the next generation of SOA platform I think.

This may has one downside: I don’t know if there will be put effort into brining the deployer in the existing versions of jbpm and the SOA platform. Nevertheless, I created a jira issue for it, where I will also mention this blog post as a reference for the proposed solution.

See the forum discussion in JBoss ESB developer forum, the forum discussion in the jbpm developer forum and the JIRA issue on the SOA platform.

Different life cycles

One important thing to recognize is: The deployment life cycle of jbpm processes is different to other artefacts like web applications (.war) or enterprise applications (.ear): The process definition gets deployed to the database. So if you remove the .par file (the artefact with the process definition in it) from the deployment directory the process definition will not be deleted from the database. This is different to other deployment units, if you remove a web application for example, it is gone… This is because business processes are long running (maybe weeks or months) and have persistent state in the database.

One thing I recommend (and mentioned in the last post): Don’t put your jbpm action classes in the database in such a environment, if there is no really good reason for it. But be aware: The classes are in your .esb/.ear ar whatever. If you undeploy these, the classes are missing, even if the process instances can still be triggered! This will result in ClassNotFoundExceptions…

Is the deployer the holy grail?

I am proud to say: Yes 😉 No, but honestly: I think the deployer is a good idea if you are running in a Java EE environment or especially in the ESB. But it isn’t the only environment where jbpm is used, we still see a lot of projects doing standalone Swing applications, pure Tomcat stuff or whatever they want to. There, the sketched ideas cannot be applied that easy.

And even in the EE environment I am not sure if the solution is already mature enough. Actually I would like to see some discussion to go on about this, I don’t want to provide this code as “the truth”. It should be somehow found a solution which really is part of the SOA platform. And this even may involve politics (is it responsibility of the ESB or jBPM team)?, but I think it is the way to get a good solution in the “product”.

The code

Last but not least I want to post the code of the deployer here, so you can apply this to your own project and play around with it. Basically it is a simple MBean which isn’t really rocket science. In the MBean I used a jBPM Command called DeployProcessIfChangedCommand, which deploys a process definition only if it has changed, the code is shown below. To use the jbpm deployer you have to add a jboss-service.xml as described in my previous post:

<?xml version="1.0" encoding="ISO-8859-1"?>
<server>
  <mbean
    code="...jbpm.deployer.SwistecJbpmDeployer"
    name="jboss.esb:service=JbpmDeployer">
    <depends>jboss.ejb3:service=EJB3Deployer</depends>
    <depends>jboss.jca:service=DataSourceBinding,name=jdbc/MysqljBPM</depends>
  </mbean>
</server>

The deployer itself:

/**
 * The JbpmDeployer is a {@link ServiceMBean} and a JBoss {@link SubDeployer}.
 *
 * It has the following responsibilities:
 * <ul>
 *   <li>Picking up .par files which contain a processdefinition.xml</li>
 *   <li>Checking if the processdefinition.xml in the par file has changed.
 *       If it has, the par is deployed automatically to jBPM</li>
 *   <li>Register the responsible {@link ClassLoader} for the par
 *       (normally the ClassLoader of the EAR, which contains the par)
 *       in the central registry as responsible for this process</li>
 * </ul>
 *
 * This Mbean has is started as deployer from a jboss-service.xml.
 * If not processdefinition.xml is contained it is not recognized
 * and passed to the normal EJB3Deployer.
 *
 * @author bernd.ruecker@camunda.com
 */
public class SwistecJbpmDeployer extends SubDeployerSupport implements
    SubDeployer, SwistecJbpmDeployerMBean {

  private static Log log = LogFactory.getLog(SwistecJbpmDeployer.class);

  private SubDeployer thisProxy;

  private HashMap<String, ClassLoader> processClassLoaders =
    new HashMap<String, ClassLoader>();

  private JbpmConfiguration jbpmConfiguration;

  private CommandService commandService;

  public SwistecJbpmDeployer() {
    super();
    setSuffixes(new String[] { ".par", ".jbpm" });
    setRelativeOrder(1100);
    // before EJB 3 deployer (has 400)
    // after ESBDeployer (has 1000)
  }

  /**
   * copied from EJB3Deployer
   */
  public static boolean hasFile(DeploymentInfo di, String filePath) {
    String urlStr = di.url.getFile();
    try {
      URL dd = di.localCl.findResource(filePath);
      if (dd != null) {

        // If the DD url is not a subset of the urlStr then this is
        // coming
        // from a jar referenced by the deployment jar manifest and the
        // this deployment jar it should not be treated as persistence
        if (di.localUrl != null) {
          urlStr = di.localUrl.toString();
        }

        String ddStr = dd.toString();
        if (ddStr.indexOf(urlStr) >= 0) {
          return true;
        }
      }
    } catch (Exception ignore) {
    }
    return false;
  }

  /**
   * check if the deployment unit contains a processdefinition.xml
   */
  protected boolean hasProcessDefinitionFile(DeploymentInfo di) {
    return hasFile(di, "processdefinition.xml");
  }

  /**
   * Returns true if this Deployment is a process archive, so it can be
   * handled by this Deployer
   *
   * @return True if this deployer can deploy the given DeploymentInfo.
   * @jmx:managed-operation
   */
  public boolean accepts(DeploymentInfo di) {
    String urlStr = di.url.getFile();
    if ((urlStr.endsWith(".par") || urlStr.endsWith(".par/"))
        && hasProcessDefinitionFile(di)) {
      log.debug("Jbpm deployer accepted deployment unit " + urlStr);
      return true;
    }
    return false;
  }

  /**
   * Get a reference to the ServiceController
   */
  protected void startService() throws Exception {
    // make a proxy to myself, so that calls from the MainDeployer
    // can go through the MBeanServer, so interceptors can be added
    thisProxy = (SubDeployer) MBeanProxyExt.create( //
        SubDeployer.class, //
        super.getServiceName(), //
        super.getServer());

    // register with the main deployer
    mainDeployer.addDeployer(thisProxy);
    startJbpmConfiguration();

    log.info("Started and listening for process archives");
    super.startService();
  }

  /**
   * Implements the template method in superclass. This method stops all the
   * applications in this server.
   */
  protected void stopService() throws Exception {
    closeJbpmConfiguration();
    // deregister with MainDeployer
    mainDeployer.removeDeployer(thisProxy);

    log.info("Stopped");
    super.stopService();
  }

  /**
   * copied more or less from EJB3Deployer, watch processdefinition.xml
   */
  public void init(DeploymentInfo di) throws DeploymentException {
    try {
      if (di.url.getProtocol().equalsIgnoreCase("file")) {
        File file = new File(di.url.getFile());

        if (!file.isDirectory()) {
          // If not directory we watch the package
          di.watch = di.url;
        } else {
          // If directory we watch the xml files
          di.watch = new URL(di.url, "processdefinition.xml");
        }
      } else {
        // We watch the top only, no directory support
        di.watch = di.url;
      }

    } catch (Exception e) {
      if (e instanceof DeploymentException) {
        throw (DeploymentException) e;
      }
      throw new DeploymentException("failed to initialize", e);
    }

    // invoke super-class initialization
    super.init(di);
  }

  @Override
  public void start(DeploymentInfo di) throws DeploymentException {
      try {
    try {
      log.info("********************************************************");
      log.info("Start jBPM process deployment and register classloader for archive " + di.url);

      ProcessDefinition pd = deployProcess(di);
      rememberProcessClassloader(pd, di);

      log.info("********************************************************");

    } catch (Exception ex) {
      throw new DeploymentException("error while deploying process", ex);
    }
        super.start(di);
      } catch (Exception ex) {
        throw new DeploymentException("Caught exception while deploying " + di.url, ex);
      }
  }

  @Override
  public void stop(DeploymentInfo di) throws DeploymentException {
    log.info("Undeployed jBPM process and deregister classloader for archive " + di.url);

    super.stop(di);
  }

  /**
   * deploy process definitions to jBPM. Uses the {@link DeployProcessIfChangedCommand}
   * which deploy only, if the contained process has changed
   *
   * @throws Exception
   */
  public ProcessDefinition deployProcess(DeploymentInfo di) throws Exception {
    URL url = (di.localUrl != null ? di.localUrl : di.url);
    InputStream resourceAsStream = url.openStream();
    ProcessDefinition pd = (ProcessDefinition) commandService.execute( //
        new DeployProcessIfChangedCommand(resourceAsStream));

    return pd;
  }

  /**
   * remember the current context {@link ClassLoader} as responsible for the
   * given {@link ProcessDefinition}.
   */
  public void rememberProcessClassloader(ProcessDefinition pd, DeploymentInfo di) {
    processClassLoaders.put(pd.getName(), di.ucl);
    log.info("set classloader " + di.ucl + " for process " + pd.getName());
  }

  /**
   * remember the current context {@link ClassLoader} as responsible for the
   * given {@link ProcessDefinition}.
   */
  public void removeProcessClassloader(DeploymentInfo di) {
    for (String processDefinitionName : processClassLoaders.keySet()) {
      if (processClassLoaders.get(processDefinitionName) == di.ucl) {
        processClassLoaders.remove(processDefinitionName);
        log.info("removed classloader " + di.ucl + " for process " + processDefinitionName);
      }
    }
    log.warn("Classloader " + di.ucl + " was not registered for any process, couldn't be unregistered! Deployment unit:\n" + di);
  }

  /**
   * create {@link JbpmConfiguration}
   * @throws NamingException
   */
  private void startJbpmConfiguration() throws NamingException {
    if (jbpmConfiguration==null) {
      jbpmConfiguration = JbpmConfiguration.getInstance("jbpm.cfg.xml");
      commandService = new CommandServiceImpl(jbpmConfiguration);
    }
  }

  /**
   * close {@link JbpmConfiguration}
   * @throws NamingException
   */
  private void closeJbpmConfiguration() throws NamingException {
    if (jbpmConfiguration!=null) {
      jbpmConfiguration.close();
    }
    commandService = null;
    jbpmConfiguration = null;
  }

  public String getProcessClassloaderInformation() {
    StringBuffer clInfo = new StringBuffer();
    for (String pdName : processClassLoaders.keySet()) {
      ClassLoader cl = processClassLoaders.get(pdName);

      clInfo.append(" [process=").append(pdName);
      clInfo.append(":  ").append(cl).append("] ");
    }
    return clInfo.toString();
  }

  public ClassLoader getProcessClassloader(String name) {
    ClassLoader classLoader = processClassLoaders.get(name);
    if (log.isDebugEnabled())
      log.debug("resolved " + classLoader + " for process definition " + name);
    return classLoader;
  }
}

The command to deploy only changed process definitions is currently quite easy, it just checks, if the content in the processdefinition.xml has changed (as String). Obviously this could be much more sophisticated, because now simple white spaces can cause new process versions, but for the moment we ignored this problem, since this is quite rare and if it occurs, there is no big consequence (one process definition more, not nice, but also doesn’t harm).

/**
 * command to deploy jbpm process definition only when
 * the process definition has changed.
 *
 * @author bernd.ruecker@camunda.com
 */
public class DeployProcessIfChangedCommand implements Command {

  private static final long serialVersionUID = 1176499448778173883L;

  private Log log = LogFactory.getLog(this.getClass());

  private ZipInputStream inputStream;

  public DeployProcessIfChangedCommand(InputStream parArchiveInputStream) {
    this.inputStream = new ZipInputStream(parArchiveInputStream);
  }

  public Object execute(JbpmContext jbpmContext) throws Exception {
    ProcessDefinition newProcessDefinition =
      ProcessDefinition.parseParZipInputStream(inputStream);
    ProcessDefinition existingProcessDefinition =
      jbpmContext.getGraphSession().findLatestProcessDefinition(newProcessDefinition.getName());

    if (processDefinitionHasChanged(existingProcessDefinition, newProcessDefinition)) {
      jbpmContext.deployProcessDefinition(newProcessDefinition);
      log.info("deployed " + newProcessDefinition.getName() + " to jbpm");
    }

    return newProcessDefinition;
  }

  /**
   * check if there exist differences in the two
   * given process definitions
   */
  private boolean processDefinitionHasChanged(
      ProcessDefinition existingProcessDefinition,
      ProcessDefinition newProcessDefinition) {

    if (existingProcessDefinition==null) {
      // no process definition exists by this name
      log.info("process definition for " + newProcessDefinition.getName()  + " does not exist yet");
      return true;
    }

    String existingPdXml = getProcessdefinitionXml(existingProcessDefinition);
    String newPdXml = getProcessdefinitionXml(newProcessDefinition);

    if (xmlEquals(existingPdXml, newPdXml)) {
      log.info("process definition for " + existingProcessDefinition.getName()  + " has not changed");
      return false;
    }
    else {
      log.info("process definition for " + existingProcessDefinition.getName()  + " has changed");
      return true;
    }
  }

  /**
   * get the processdefinition.xml from the {@link ProcessDefinition}.
   * Because the process is deplyoed as par (process archive) the xml
   * is linkes as {@link FileDefinition} to the {@link ProcessDefinition}.
   */
  private String getProcessdefinitionXml(ProcessDefinition pd) {
    FileDefinition fd = (FileDefinition) pd.getDefinition(FileDefinition.class);
    return new String( fd.getBytes("processdefinition.xml") );
  }

  /**
   * compares if the two xml strings are equals
   */
  private boolean xmlEquals(String xml1, String xml2) {
    return removeIgnoredCharacters(xml1).equals(
        removeIgnoredCharacters(xml2));
  }

  /**
   * replace characters in xml which should be ignored
   * while comparing two processdefinition.xml's.
   * Basically these are new lines and tabs.
   *
   * White spaces are dangerous, maybe only a white space in a attribute name has changed!
   *
   * TODO: add more sophisticated way of comparing?
   */
  private String removeIgnoredCharacters(String st) {
    return st.replaceAll("\n", "").replaceAll("\t", "");
  }

}

Till now I only talked about the deployment of processes. But the deployer supports a second feature too, also introduced in the last post: Correct scoped classloading. To get this to work I configured my own ProcessClassLoader in jbpm (which can be done since jbpm 3.3, released this week) easily in the jbpm.cfg.xml:

<string name="jbpm.classloader" value="context" />
<bean   name="jbpm.processClassLoader" class="...jbpm.deployer.SwistecProcessClassLoaderFactory" singleton="true" />

The special ProcessClassLoader now asks the deployer Mbean for the correct classloader. The Mbean registered the right classloaders for processes during deployment. So if the process archive (par) was nested in an ear, the classloader for the ear was registered for the process. This is the code of the ProcessClassLoader:

/**
 * {@link ProcessClassLoader} to load process specific classes.
 * asks the {@link JbpmDeployService} MBean which class loader is responsible
 * for the specified Link {@link ProcessDefinition}
 *
 * @author bernd.ruecker@camunda.com
 */
public class SwistecProcessClassLoader extends ProcessClassLoader {

  private Log log = LogFactory.getLog(this.getClass());

  private static final String mbeanName = "jboss.esb:service=JbpmDeployer";

  private ProcessDefinition processDefinition = null;

  private SwistecJbpmDeployerMBean jbpmDeployer = null;

  public SwistecProcessClassLoader( ClassLoader parent, ProcessDefinition processDefinition ) {
    super( Thread.currentThread().getContextClassLoader(),
         processDefinition );
    this.processDefinition = processDefinition;
  }

  /**
   * finds the correct context {@link ClassLoader} for the application
   * which defined the specified {@link ProcessDefinition}
   */
  public ClassLoader getApplicationContextClassloader() {
    SwistecJbpmDeployerMBean deployer = getJbpmDeployer();

    ClassLoader classLoader = deployer.getProcessClassloader(
        processDefinition.getName());

    if (classLoader==null)
      throw new IllegalStateException("No classloader registered for process " + processDefinition + ". Is the jbpm.deployer working and the par (process archive) is deployed correctly?");

    if (log.isDebugEnabled())
        log.debug("resolved " + classLoader + " for process definition " + processDefinition);

    return classLoader;
  }

  private SwistecJbpmDeployerMBean getJbpmDeployer() {
    try {
      if (jbpmDeployer == null) {
        MBeanServer server = MBeanServerLocator.locate();
        jbpmDeployer = (SwistecJbpmDeployerMBean) MBeanProxyExt.create(
            SwistecJbpmDeployerMBean.class,
            mbeanName,
            server);
      }
      return jbpmDeployer;
    }
    catch (Throwable ex) {
      throw new ConfigurationException("couldn't connect to JbpmDeployer MBean '"+mbeanName+"'. Is it deployed correclty on your system?", ex);
    }
  }

  public URL findResource(String name) {
    // try to find it in the jbpm database
  URL url = super.findResource(name);

  // if not found, search in the ear classloader
    if (url==null)
      url = getApplicationContextClassloader().getResource(name);

    return url;
  }

  public Class findClass(String name) throws ClassNotFoundException {
    Class clazz = null;

    FileDefinition fileDefinition = processDefinition.getFileDefinition();
    if (fileDefinition!=null) {
      String fileName = "classes/" + name.replace( '.', '/' ) + ".class";
      byte[] classBytes;
      try {
        classBytes = fileDefinition.getBytes(fileName);
        clazz = defineClass(name, classBytes, 0, classBytes.length);
      } catch (JbpmException e) {
        clazz = null;
      }
    }

    if (clazz==null) {
      // try to load it from the scoped EAR classloader
      // if not found in process database
        clazz = getApplicationContextClassloader().loadClass(name);
    }

    if (clazz==null) {
      throw new ClassNotFoundException("class '"+name+"' could not be found by the process classloader");
    }

    return clazz;
  }
}

At the end I want to appologize that I didn’t find the time to layout or explain the code better here. I am quite happy that I somehow managed it to get it online this week 😉

I am waiting for your feedback!

Already read?

Scientific performance benchmark of open source BPMN engines

Why BPMN is not enough

Decision Model and Notation (DMN) – the new Business Rules Standard. An introduction by example.

New Whitepaper: The Zero-Code BPM Myth

5 Responses

Leave a reply