Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-9455: Introduce IntegrationKeepAlive #9493

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.endpoint.management.IntegrationKeepAlive;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.support.IntegrationMessageHandlerMethodFactory;
import org.springframework.integration.json.JsonPathUtils;
Expand Down Expand Up @@ -129,6 +130,7 @@ public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) t
registerListMessageHandlerMethodFactory();
registerIntegrationConfigurationReport();
registerControlBusCommandRegistry();
registerKeepAlive();
}

@Override
Expand Down Expand Up @@ -460,4 +462,15 @@ private static BeanDefinitionBuilder createMessageHandlerMethodFactoryBeanDefini
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME);
}

private void registerKeepAlive() {
if (!this.beanFactory.containsBean(IntegrationContextUtils.INTEGRATION_KEEP_ALIVE_BEAN_NAME)) {
BeanDefinitionBuilder builder =
BeanDefinitionBuilder.genericBeanDefinition(IntegrationKeepAlive.class)
.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);

this.registry.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_KEEP_ALIVE_BEAN_NAME,
builder.getBeanDefinition());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,18 @@ public abstract class IntegrationContextUtils {

public static final String LIST_MESSAGE_HANDLER_FACTORY_BEAN_NAME = "integrationListMessageHandlerMethodFactory";

/**
* The bean name for the {@code org.springframework.integration.support.management.ControlBusCommandRegistry}.
* @since 6.4
*/
public static final String CONTROL_BUS_COMMAND_REGISTRY_BEAN_NAME = "controlBusCommandRegistry";

/**
* The bean name for the {@code org.springframework.integration.endpoint.management.IntegrationKeepAlive}.
* @since 6.4
*/
public static final String INTEGRATION_KEEP_ALIVE_BEAN_NAME = "integrationKeepAlive";

/**
* The default timeout for blocking operations like send and receive messages.
* @since 6.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/**
* Utility class to encapsulate infrastructure Integration properties constants and their default values.
* The default values can be overridden by the {@code META-INF/spring.integration.properties} with this entries
* The default values can be overridden by the {@code META-INF/spring.integration.properties} with these entries
* (includes their default values):
* <ul>
* <li> {@code spring.integration.channels.autoCreate=true}
Expand All @@ -38,6 +38,7 @@
* <li> {@code spring.integration.channels.error.requireSubscribers=true}
* <li> {@code spring.integration.channels.error.ignoreFailures=true}
* <li> {@code spring.integration.endpoints.defaultTimeout=30000}
* <li> {@code spring.integration.keepAlive=true}
* </ul>
*
* @author Artem Bilan
Expand Down Expand Up @@ -117,6 +118,12 @@ public final class IntegrationProperties {
*/
public static final String ENDPOINTS_DEFAULT_TIMEOUT = INTEGRATION_PROPERTIES_PREFIX + "endpoints.defaultTimeout";

/**
* Set to {@code false} to fully disable Keep-Alive thread.
* @since 6.4
*/
public static final String KEEP_ALIVE = INTEGRATION_PROPERTIES_PREFIX + "keepAlive";

private static final Properties DEFAULTS;

private boolean channelsAutoCreate = true;
Expand All @@ -139,6 +146,8 @@ public final class IntegrationProperties {

private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;

private boolean keepAlive = true;

private volatile Properties properties;

static {
Expand Down Expand Up @@ -312,11 +321,30 @@ public long getEndpointsDefaultTimeout() {
/**
* Configure a value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @param endpointsDefaultTimeout the value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @since 6.2
*/
public void setEndpointsDefaultTimeout(long endpointsDefaultTimeout) {
this.endpointsDefaultTimeout = endpointsDefaultTimeout;
}

/**
* Return the value of {@link #KEEP_ALIVE} option.
* @return the value of {@link #KEEP_ALIVE} option.
* @since 6.4
*/
public boolean isKeepAlive() {
return this.keepAlive;
}

/**
* Configure a value for {@link #KEEP_ALIVE} option.
* Defaults {@code true} - set to {@code false} disable keep-alive thread.
* @param keepAlive {@code false} to disable keep-alive thread.
*/
public void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
}

/**
* Represent the current instance as a {@link Properties}.
* @return the {@link Properties} representation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.endpoint.management;

import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.endpoint.AbstractPollingEndpoint;
import org.springframework.scheduling.TaskScheduler;

/**
* The component to keep an application alive when there are no non-daemon threads.
* Some application might just not rely on the loops in specific threads for their logic.
* Or target protocol to integrate with communicates via daemon threads.
* <p>
* A bean for this class is registered automatically by Spring Integration infrastructure.
* It is started by application context for a blocked keep-alive dedicated thread
* only if there is no {@link AbstractPollingEndpoint} beans in the application context
* or {@link TaskScheduler} is configured for daemon (or virtual) threads.
* <p>
* Can be stopped (or started respectively) manually after injection into some target service if found redundant.
* <p>
* The {@link IntegrationProperties#KEEP_ALIVE} integration global
* property can be set to {@code false} to disable this component regardless of the application logic.
*
* @author Artem Bilan
*
* @since 6.4
*/
public class IntegrationKeepAlive implements SmartLifecycle, SmartInitializingSingleton, BeanFactoryAware {

private static final Log LOG = LogFactory.getLog(IntegrationKeepAlive.class);

private final AtomicBoolean running = new AtomicBoolean();

private BeanFactory beanFactory;

private boolean autoStartup;

private volatile Thread keepAliveThread;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}

@Override
public void afterSingletonsInstantiated() {
IntegrationProperties integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
this.autoStartup =
integrationProperties.isKeepAlive()
&& (isTaskSchedulerDaemon() || !isAbstractPollingEndpointPresent());
}

private boolean isTaskSchedulerDaemon() {
TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(this.beanFactory);
AtomicBoolean isDaemon = new AtomicBoolean();
CountDownLatch checkDaemonThreadLatch = new CountDownLatch(1);
taskScheduler.schedule(() -> {
isDaemon.set(Thread.currentThread().isDaemon());
checkDaemonThreadLatch.countDown();
}, Instant.now());

boolean logWarning = false;
try {
if (!checkDaemonThreadLatch.await(10, TimeUnit.SECONDS)) {
logWarning = true;
}
}
catch (InterruptedException ex) {
logWarning = true;
}
if (logWarning) {
LOG.warn("The 'IntegrationKeepAlive' cannot check a 'TaskScheduler' daemon threads status. " +
"Falling back to 'keep-alive'");
}
return isDaemon.get();
}

private boolean isAbstractPollingEndpointPresent() {
return this.beanFactory.getBeanProvider(AbstractPollingEndpoint.class)
.stream()
.findAny()
.isPresent();
}

@Override
public boolean isAutoStartup() {
return this.autoStartup;
}

@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.keepAliveThread =
new Thread(() -> {
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException ex) {
break;
}
}
});
this.keepAliveThread.setDaemon(false);
this.keepAliveThread.setName("spring-integration-keep-alive");
this.keepAliveThread.start();
}
}

@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
this.keepAliveThread.interrupt();
}
}

@Override
public boolean isRunning() {
return this.running.get();
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/**
* Provides classes related to endpoint management.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.integration.endpoint.management;
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ spring.integration.messagingTemplate.throwExceptionOnLateReply=false
spring.integration.readOnly.headers=
spring.integration.endpoints.noAutoStartup=
spring.integration.endpoints.defaultTimeout=30000
spring.integration.keepAlive=true
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.management.IntegrationKeepAlive;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.test.annotation.DirtiesContext;
Expand Down Expand Up @@ -52,6 +53,9 @@ public class IntegrationContextTests {
@Autowired
private ThreadPoolTaskScheduler taskScheduler;

@Autowired
private IntegrationKeepAlive integrationKeepAlive;

@Test
public void testIntegrationContextComponents() {
assertThat(this.integrationProperties.isMessagingTemplateThrowExceptionOnLateReply()).isTrue();
Expand All @@ -62,6 +66,7 @@ public void testIntegrationContextComponents() {
assertThat(this.serviceActivator.isRunning()).isFalse();
assertThat(this.serviceActivatorExplicit.isAutoStartup()).isTrue();
assertThat(this.serviceActivatorExplicit.isRunning()).isTrue();
assertThat(this.integrationKeepAlive.isRunning()).isTrue();
}

}
Loading