diff --git a/README.md b/README.md index 55823fd4af531..8b84773b6fa02 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ The beginner examples are another powerful alternative pathway for getting start **Building** - + **Contributions** diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/bean.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/bean.json index 75f019fc72754..c9abb38b12e1d 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/bean.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/bean.json @@ -24,7 +24,8 @@ "componentProperties": { "lazyStartProducer": { "index": 0, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, "scope": { "index": 1, "kind": "property", "displayName": "Scope", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.BeanScope", "enum": [ "Singleton", "Request", "Prototype" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Singleton", "description": "Scope of bean. When using singleton scope (default) the bean is created or looked up only once and reused for the lifetime of the endpoint. The bean should be thread-safe in case concurrent threads is calling the bean at the same time. When using request scope the bean is created or looked up once per request (exchange). This can be used if you want to store state on a bean while processing a request and you want to call the same bean instance multiple times while processing the request. The bean does not have to be thread-safe as the instance is only called from the same request. When using delegate scope, then the bean will be looked up or created per call. However in case of lookup then this is delegated to the bean registry such as Spring or CDI (if in use), which depends on their configuration can act as either singleton or prototype scope. so when using prototype then this depends on the delegated registry." }, - "autowiredEnabled": { "index": 2, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." } + "autowiredEnabled": { "index": 2, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." }, + "beanInfoCacheSize": { "index": 3, "kind": "property", "displayName": "Bean Info Cache Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Maximum cache size of internal cache for bean introspection. Setting a value of 0 or negative will disable the cache." } }, "headers": { "CamelBeanMethodName": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the method to invoke.", "constantName": "org.apache.camel.component.bean.BeanConstants#BEAN_METHOD_NAME" } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/class.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/class.json index 251ab2355401c..a870e29457567 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/class.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/class.json @@ -24,7 +24,8 @@ "componentProperties": { "lazyStartProducer": { "index": 0, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, "scope": { "index": 1, "kind": "property", "displayName": "Scope", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.BeanScope", "enum": [ "Singleton", "Request", "Prototype" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Singleton", "description": "Scope of bean. When using singleton scope (default) the bean is created or looked up only once and reused for the lifetime of the endpoint. The bean should be thread-safe in case concurrent threads is calling the bean at the same time. When using request scope the bean is created or looked up once per request (exchange). This can be used if you want to store state on a bean while processing a request and you want to call the same bean instance multiple times while processing the request. The bean does not have to be thread-safe as the instance is only called from the same request. When using delegate scope, then the bean will be looked up or created per call. However in case of lookup then this is delegated to the bean registry such as Spring or CDI (if in use), which depends on their configuration can act as either singleton or prototype scope. so when using prototype then this depends on the delegated registry." }, - "autowiredEnabled": { "index": 2, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." } + "autowiredEnabled": { "index": 2, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." }, + "beanInfoCacheSize": { "index": 3, "kind": "property", "displayName": "Bean Info Cache Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Maximum cache size of internal cache for bean introspection. Setting a value of 0 or negative will disable the cache." } }, "headers": { "CamelBeanMethodName": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the method to invoke.", "constantName": "org.apache.camel.component.bean.BeanConstants#BEAN_METHOD_NAME" } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/releases/camel-releases.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/releases/camel-releases.json index 68823ef2b77c9..b1f56ed4a78aa 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/releases/camel-releases.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/releases/camel-releases.json @@ -763,5 +763,10 @@ "eol": "2024-08-14", "kind": "lts", "jdk": "17" + }, + { + "version": "4.1.0", + "date": "2023-10-11", + "jdk": "17" } ] diff --git a/components/camel-aws/camel-aws2-ddb/pom.xml b/components/camel-aws/camel-aws2-ddb/pom.xml index ee0fb212ba544..4b3350684d39f 100644 --- a/components/camel-aws/camel-aws2-ddb/pom.xml +++ b/components/camel-aws/camel-aws2-ddb/pom.xml @@ -30,7 +30,7 @@ jar Camel :: AWS2 DDB - A Camel Amazon DynamoDB Web Service Component Version 2 + Store and retrieve data and events from Amazon DynamoDB service diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/component.properties b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/component.properties index 2e5a2ef611f37..5c4e78c511e27 100644 --- a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/component.properties +++ b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/component.properties @@ -4,4 +4,4 @@ groupId=org.apache.camel artifactId=camel-aws2-ddb version=4.2.0-SNAPSHOT projectName=Camel :: AWS2 DDB -projectDescription=A Camel Amazon DynamoDB Web Service Component Version 2 +projectDescription=Store and retrieve data and events from Amazon DynamoDB service diff --git a/components/camel-bean/src/generated/java/org/apache/camel/component/bean/BeanComponentConfigurer.java b/components/camel-bean/src/generated/java/org/apache/camel/component/bean/BeanComponentConfigurer.java index d508722a03dca..5a94083acf09d 100644 --- a/components/camel-bean/src/generated/java/org/apache/camel/component/bean/BeanComponentConfigurer.java +++ b/components/camel-bean/src/generated/java/org/apache/camel/component/bean/BeanComponentConfigurer.java @@ -23,6 +23,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj switch (ignoreCase ? name.toLowerCase() : name) { case "autowiredenabled": case "autowiredEnabled": target.setAutowiredEnabled(property(camelContext, boolean.class, value)); return true; + case "beaninfocachesize": + case "beanInfoCacheSize": target.setBeanInfoCacheSize(property(camelContext, int.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "scope": target.setScope(property(camelContext, org.apache.camel.BeanScope.class, value)); return true; @@ -35,6 +37,8 @@ public Class getOptionType(String name, boolean ignoreCase) { switch (ignoreCase ? name.toLowerCase() : name) { case "autowiredenabled": case "autowiredEnabled": return boolean.class; + case "beaninfocachesize": + case "beanInfoCacheSize": return int.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; case "scope": return org.apache.camel.BeanScope.class; @@ -48,6 +52,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) { switch (ignoreCase ? name.toLowerCase() : name) { case "autowiredenabled": case "autowiredEnabled": return target.isAutowiredEnabled(); + case "beaninfocachesize": + case "beanInfoCacheSize": return target.getBeanInfoCacheSize(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); case "scope": return target.getScope(); diff --git a/components/camel-bean/src/generated/resources/org/apache/camel/component/bean/bean.json b/components/camel-bean/src/generated/resources/org/apache/camel/component/bean/bean.json index 75f019fc72754..c9abb38b12e1d 100644 --- a/components/camel-bean/src/generated/resources/org/apache/camel/component/bean/bean.json +++ b/components/camel-bean/src/generated/resources/org/apache/camel/component/bean/bean.json @@ -24,7 +24,8 @@ "componentProperties": { "lazyStartProducer": { "index": 0, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, "scope": { "index": 1, "kind": "property", "displayName": "Scope", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.BeanScope", "enum": [ "Singleton", "Request", "Prototype" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Singleton", "description": "Scope of bean. When using singleton scope (default) the bean is created or looked up only once and reused for the lifetime of the endpoint. The bean should be thread-safe in case concurrent threads is calling the bean at the same time. When using request scope the bean is created or looked up once per request (exchange). This can be used if you want to store state on a bean while processing a request and you want to call the same bean instance multiple times while processing the request. The bean does not have to be thread-safe as the instance is only called from the same request. When using delegate scope, then the bean will be looked up or created per call. However in case of lookup then this is delegated to the bean registry such as Spring or CDI (if in use), which depends on their configuration can act as either singleton or prototype scope. so when using prototype then this depends on the delegated registry." }, - "autowiredEnabled": { "index": 2, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." } + "autowiredEnabled": { "index": 2, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." }, + "beanInfoCacheSize": { "index": 3, "kind": "property", "displayName": "Bean Info Cache Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Maximum cache size of internal cache for bean introspection. Setting a value of 0 or negative will disable the cache." } }, "headers": { "CamelBeanMethodName": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the method to invoke.", "constantName": "org.apache.camel.component.bean.BeanConstants#BEAN_METHOD_NAME" } diff --git a/components/camel-bean/src/generated/resources/org/apache/camel/component/beanclass/class.json b/components/camel-bean/src/generated/resources/org/apache/camel/component/beanclass/class.json index 251ab2355401c..a870e29457567 100644 --- a/components/camel-bean/src/generated/resources/org/apache/camel/component/beanclass/class.json +++ b/components/camel-bean/src/generated/resources/org/apache/camel/component/beanclass/class.json @@ -24,7 +24,8 @@ "componentProperties": { "lazyStartProducer": { "index": 0, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." }, "scope": { "index": 1, "kind": "property", "displayName": "Scope", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.BeanScope", "enum": [ "Singleton", "Request", "Prototype" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Singleton", "description": "Scope of bean. When using singleton scope (default) the bean is created or looked up only once and reused for the lifetime of the endpoint. The bean should be thread-safe in case concurrent threads is calling the bean at the same time. When using request scope the bean is created or looked up once per request (exchange). This can be used if you want to store state on a bean while processing a request and you want to call the same bean instance multiple times while processing the request. The bean does not have to be thread-safe as the instance is only called from the same request. When using delegate scope, then the bean will be looked up or created per call. However in case of lookup then this is delegated to the bean registry such as Spring or CDI (if in use), which depends on their configuration can act as either singleton or prototype scope. so when using prototype then this depends on the delegated registry." }, - "autowiredEnabled": { "index": 2, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." } + "autowiredEnabled": { "index": 2, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." }, + "beanInfoCacheSize": { "index": 3, "kind": "property", "displayName": "Bean Info Cache Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Maximum cache size of internal cache for bean introspection. Setting a value of 0 or negative will disable the cache." } }, "headers": { "CamelBeanMethodName": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the method to invoke.", "constantName": "org.apache.camel.component.bean.BeanConstants#BEAN_METHOD_NAME" } diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanComponent.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanComponent.java index 7381aa92c308c..7fb4a8f1e5496 100644 --- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanComponent.java +++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanComponent.java @@ -38,7 +38,7 @@ public class BeanComponent extends DefaultComponent { // use an internal soft cache for BeanInfo as they are costly to introspect // for example the bean language using OGNL expression runs much faster reusing the BeanInfo from this cache - private final Map beanInfoCache = LRUCacheFactory.newLRUSoftCache(1000); + private Map beanInfoCache; @Metadata(defaultValue = "Singleton", description = "Scope of bean." + " When using singleton scope (default) the bean is created or looked up only once and reused for the lifetime of the endpoint." @@ -51,9 +51,23 @@ public class BeanComponent extends DefaultComponent { + " so when using prototype then this depends on the delegated registry.") private BeanScope scope = BeanScope.Singleton; + @Metadata(label = "advanced", defaultValue = "1000", + description = "Maximum cache size of internal cache for bean introspection. Setting a value of 0 or negative will disable the cache.") + private int beanInfoCacheSize = 1000; + public BeanComponent() { } + @Override + protected void doInit() throws Exception { + super.doInit(); + + if (beanInfoCache == null && beanInfoCacheSize > 0) { + LOG.debug("Creating BeanInfo with maximum cache size: {}", beanInfoCacheSize); + beanInfoCache = LRUCacheFactory.newLRUSoftCache(beanInfoCacheSize); + } + } + // Implementation methods //----------------------------------------------------------------------- @Override @@ -70,11 +84,17 @@ protected Endpoint createEndpoint(String uri, String remaining, Map> iterator() { + dataSet.goTop(); + return new Iterator>() { Optional nextData = Optional.empty(); diff --git a/components/camel-flatpack/src/test/java/org/apache/camel/component/flatpack/FlatpackDataSetListIteratorTest.java b/components/camel-flatpack/src/test/java/org/apache/camel/component/flatpack/FlatpackDataSetListIteratorTest.java new file mode 100644 index 0000000000000..1327216bbbd55 --- /dev/null +++ b/components/camel-flatpack/src/test/java/org/apache/camel/component/flatpack/FlatpackDataSetListIteratorTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.flatpack; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.dataformat.flatpack.FlatpackDataFormat; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class FlatpackDataSetListIteratorTest extends CamelTestSupport { + + private static final String DIRECT_URI = "direct:flatpach-test"; + + @EndpointInject("mock:each-item") + private MockEndpoint eachItemEndpoint; + + @Test + void sendTESTwithErrors() throws Exception { + String header = "A,B,C"; + String record1 = "1,2,3"; + String record2 = "7,8,9"; + Object body = Stream.of(header, record1, record2).collect(Collectors.joining("\r\n")); + + eachItemEndpoint.expectedMessageCount(2); + + template.sendBody(DIRECT_URI, body); + + eachItemEndpoint.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + FlatpackDataFormat format = new FlatpackDataFormat(); + format.setDelimiter(','); + format.setTextQualifier('"'); + + return new RouteBuilder() { + @Override + public void configure() { + from(DIRECT_URI) + .unmarshal(format) + .log("Processing ${body}") + .split().body() + .to("mock:each-item") + .end(); + } + }; + } + +} diff --git a/components/camel-kubernetes/src/generated/java/io/fabric8/kubernetes/client/ConfigFluentImplConfigurer.java b/components/camel-kubernetes/src/generated/java/io/fabric8/kubernetes/client/ConfigFluentImplConfigurer.java index 8d9a1560aa4c9..02a6410f7bf00 100644 --- a/components/camel-kubernetes/src/generated/java/io/fabric8/kubernetes/client/ConfigFluentImplConfigurer.java +++ b/components/camel-kubernetes/src/generated/java/io/fabric8/kubernetes/client/ConfigFluentImplConfigurer.java @@ -91,6 +91,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj case "OauthToken": target.withOauthToken(property(camelContext, java.lang.String.class, value)); return true; case "oauthtokenprovider": case "OauthTokenProvider": target.withOauthTokenProvider(property(camelContext, io.fabric8.kubernetes.client.OAuthTokenProvider.class, value)); return true; + case "onlyhttpwatches": + case "OnlyHttpWatches": target.withOnlyHttpWatches(property(camelContext, boolean.class, value)); return true; case "password": case "Password": target.withPassword(property(camelContext, java.lang.String.class, value)); return true; case "proxypassword": @@ -202,6 +204,8 @@ public Class getOptionType(String name, boolean ignoreCase) { case "OauthToken": return java.lang.String.class; case "oauthtokenprovider": case "OauthTokenProvider": return io.fabric8.kubernetes.client.OAuthTokenProvider.class; + case "onlyhttpwatches": + case "OnlyHttpWatches": return boolean.class; case "password": case "Password": return java.lang.String.class; case "proxypassword": @@ -314,6 +318,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) { case "OauthToken": return target.getOauthToken(); case "oauthtokenprovider": case "OauthTokenProvider": return target.getOauthTokenProvider(); + case "onlyhttpwatches": + case "OnlyHttpWatches": return target.isOnlyHttpWatches(); case "password": case "Password": return target.getPassword(); case "proxypassword": diff --git a/components/camel-platform-http-vertx/pom.xml b/components/camel-platform-http-vertx/pom.xml index ec47987865f9a..200bdc6cd3cdc 100644 --- a/components/camel-platform-http-vertx/pom.xml +++ b/components/camel-platform-http-vertx/pom.xml @@ -108,6 +108,12 @@ ${wiremock-version} test + + org.awaitility + awaitility + ${awaitility-version} + test + diff --git a/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc b/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc index e4f7ac16dfb49..118225964ff7f 100644 --- a/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc +++ b/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc @@ -76,3 +76,23 @@ from("platform-http:proxy") .toD("http://" + "${headers." + Exchange.HTTP_HOST + "}"); ---- + +== Access to Request and Response + +The Vertx HTTP server has its own API abstraction for HTTP request/response objects which you can access via +Camel `HttpMessage` as shown in the custom `Processor` below : + +[source,java] +---- +.process(exchange -> { + // grab message as HttpMessage + HttpMessage message = exchange.getMessage(HttpMessage.class); + // use getRequest() / getResponse() to access Vertx directly + // you can add custom headers + message.getResponse().putHeader("beer", "Heineken"); + // also access request details and use that in the code + String p = message.getRequest().path(); + message.setBody("request path: " + p); +}); +---- + diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpMessage.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpMessage.java new file mode 100644 index 0000000000000..cece097b89c5d --- /dev/null +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpMessage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.platform.http.vertx; + +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import org.apache.camel.Exchange; +import org.apache.camel.support.DefaultMessage; +import org.apache.camel.util.ObjectHelper; + +public class HttpMessage extends DefaultMessage { + + private HttpServerRequest request; + private HttpServerResponse response; + + public HttpMessage(Exchange exchange, HttpServerRequest request, HttpServerResponse response) { + super(exchange); + init(exchange, request, response); + } + + public void init(Exchange exchange, HttpServerRequest request, HttpServerResponse response) { + setExchange(exchange); + this.request = request; + this.response = response; + } + + @Override + public void reset() { + super.reset(); + request = null; + response = null; + } + + public HttpServerRequest getRequest() { + return request; + } + + public HttpServerResponse getResponse() { + return response; + } + + @Override + public HttpMessage newInstance() { + return new HttpMessage(getExchange(), request, response); + } + + @Override + public String toString() { + // do not use toString on HTTP message + return "HttpMessage@" + ObjectHelper.getIdentityHashCode(this); + } +} diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java index 4ab809ec41134..4b96d86c504f0 100644 --- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java @@ -230,7 +230,16 @@ protected Exchange toExchange(RoutingContext ctx) { final Exchange exchange = createExchange(false); exchange.setPattern(ExchangePattern.InOut); - final Message in = toCamelMessage(ctx, exchange); + // reuse existing http message if pooled + Message in = exchange.getIn(); + if (in instanceof HttpMessage hm) { + hm.init(exchange, ctx.request(), ctx.response()); + } else { + in = new HttpMessage(exchange, ctx.request(), ctx.response()); + exchange.setMessage(in); + } + populateCamelMessage(ctx, exchange, in); + final String charset = ctx.parsedHeaders().contentType().parameter("charset"); if (charset != null) { exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, charset); @@ -245,9 +254,7 @@ protected Exchange toExchange(RoutingContext ctx) { return exchange; } - protected Message toCamelMessage(RoutingContext ctx, Exchange exchange) { - final Message result = exchange.getIn(); - + protected void populateCamelMessage(RoutingContext ctx, Exchange exchange, Message result) { final HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy(); populateCamelHeaders(ctx, result.getHeaders(), exchange, headerFilterStrategy); final String mimeType = ctx.parsedHeaders().contentType().value(); @@ -281,7 +288,6 @@ protected Message toCamelMessage(RoutingContext ctx, Exchange exchange) { result.setBody(null); } } - return result; } protected void populateAttachments(List uploads, Message message) { diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java index 683b2327e1d8a..633826ad9592a 100644 --- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java +++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java @@ -846,6 +846,36 @@ public void configure() { } } + @Test + public void testVertxRequestResponseObjects() throws Exception { + final CamelContext context = createCamelContext(); + + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/vertx/objects") + .process(exchange -> { + HttpMessage message = exchange.getMessage(HttpMessage.class); + String p = message.getRequest().path(); + message.getResponse().putHeader("beer", "Heineken"); + message.setBody("request path: " + p); + }); + } + }); + + context.start(); + + get("/vertx/objects") + .then() + .statusCode(200) + .header("beer", "Heineken") + .body(is("request path: /vertx/objects")); + } finally { + context.stop(); + } + } + static CamelContext createCamelContext() throws Exception { return createCamelContext(null); } diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpPooledExchangeTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpPooledExchangeTest.java new file mode 100644 index 0000000000000..05083e3a2f392 --- /dev/null +++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpPooledExchangeTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.platform.http.vertx; + +import java.util.concurrent.TimeUnit; + +import io.restassured.RestAssured; +import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.platform.http.PlatformHttpComponent; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.engine.PooledExchangeFactory; +import org.apache.camel.impl.engine.PooledProcessorExchangeFactory; +import org.apache.camel.spi.PooledObjectFactory; +import org.apache.camel.test.AvailablePortFinder; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +import static io.restassured.RestAssured.get; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class VertxPlatformHttpPooledExchangeTest { + + @Test + public void testEngineSetup() throws Exception { + final CamelContext context = createCamelContext(); + try { + context.start(); + + assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull(); + assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> { + assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class); + }); + + } finally { + context.stop(); + } + } + + @Test + public void testPooledExchange() throws Exception { + final CamelContext context = createCamelContext(); + + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("platform-http:/vertx/pooled") + .transform().simple("Bye World"); + } + }); + + context.start(); + + for (int i = 0; i < 3; i++) { + get("/vertx/pooled") + .then() + .statusCode(200) + .body(is("Bye World")); + } + + MockEndpoint.assertIsSatisfied(context); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + PooledObjectFactory.Statistics stat + = context.getCamelContextExtension().getExchangeFactoryManager().getStatistics(); + assertEquals(1, stat.getCreatedCounter()); + assertEquals(2, stat.getAcquiredCounter()); + assertEquals(3, stat.getReleasedCounter()); + assertEquals(0, stat.getDiscardedCounter()); + }); + } finally { + context.stop(); + } + } + + static CamelContext createCamelContext() throws Exception { + return createCamelContext(null); + } + + private static CamelContext createCamelContext(ServerConfigurationCustomizer customizer) throws Exception { + int port = AvailablePortFinder.getNextAvailable(); + VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration(); + conf.setBindPort(port); + + RestAssured.port = port; + + if (customizer != null) { + customizer.customize(conf); + } + + CamelContext context = new DefaultCamelContext(); + context.addService(new VertxPlatformHttpServer(conf)); + + ExtendedCamelContext ecc = context.getCamelContextExtension(); + + ecc.setExchangeFactory(new PooledExchangeFactory()); + ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory()); + ecc.getExchangeFactory().setStatisticsEnabled(true); + ecc.getProcessorExchangeFactory().setStatisticsEnabled(true); + + return context; + } + + interface ServerConfigurationCustomizer { + void customize(VertxPlatformHttpServerConfiguration configuration); + } +} diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java index 37b62f97ae7b3..af70a6c5a1e52 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java @@ -32,17 +32,27 @@ public class JdbcMessageIdRepository extends AbstractJdbcMessageIdRepository { protected static final String DEFAULT_TABLENAME = "CAMEL_MESSAGEPROCESSED"; + protected static final String DEFAULT_TABLE_EXISTS_STRING = "SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0"; + protected static final String DEFAULT_CREATE_STRING + = "CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), " + + "createdAt TIMESTAMP, PRIMARY KEY (processorName, messageId))"; + protected static final String DEFAULT_QUERY_STRING + = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; + protected static final String DEFAULT_INSERT_STRING + = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)"; + protected static final String DEFAULT_DELETE_STRING + = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; + protected static final String DEFAULT_CLEAR_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?"; private boolean createTableIfNotExists = true; private String tableName; - private String tableExistsString = "SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0"; - private String createString = "CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), " - + "createdAt TIMESTAMP, PRIMARY KEY (processorName, messageId))"; - private String queryString = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; - private String insertString = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)"; - private String deleteString = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?"; - private String clearString = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?"; + private String tableExistsString = DEFAULT_TABLE_EXISTS_STRING; + private String createString = DEFAULT_CREATE_STRING; + private String queryString = DEFAULT_QUERY_STRING; + private String insertString = DEFAULT_INSERT_STRING; + private String deleteString = DEFAULT_DELETE_STRING; + private String clearString = DEFAULT_CLEAR_STRING; public JdbcMessageIdRepository() { } @@ -65,12 +75,12 @@ protected void doInit() throws Exception { if (tableName != null) { // update query strings from default table name to the new table name - tableExistsString = tableExistsString.replaceFirst(DEFAULT_TABLENAME, tableName); - createString = createString.replaceFirst(DEFAULT_TABLENAME, tableName); - queryString = queryString.replaceFirst(DEFAULT_TABLENAME, tableName); - insertString = insertString.replaceFirst(DEFAULT_TABLENAME, tableName); - deleteString = deleteString.replaceFirst(DEFAULT_TABLENAME, tableName); - clearString = clearString.replaceFirst(DEFAULT_TABLENAME, tableName); + tableExistsString = DEFAULT_TABLE_EXISTS_STRING.replace(DEFAULT_TABLENAME, tableName); + createString = DEFAULT_CREATE_STRING.replace(DEFAULT_TABLENAME, tableName); + queryString = DEFAULT_QUERY_STRING.replace(DEFAULT_TABLENAME, tableName); + insertString = DEFAULT_INSERT_STRING.replace(DEFAULT_TABLENAME, tableName); + deleteString = DEFAULT_DELETE_STRING.replace(DEFAULT_TABLENAME, tableName); + clearString = DEFAULT_CLEAR_STRING.replace(DEFAULT_TABLENAME, tableName); } } diff --git a/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXProcessor.java b/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXProcessor.java index b9cf046ba04c7..38fbb1f5ff654 100644 --- a/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXProcessor.java +++ b/components/camel-stax/src/main/java/org/apache/camel/component/stax/StAXProcessor.java @@ -54,7 +54,7 @@ public void process(Exchange exchange) throws Exception { StaxStreamXMLReader reader = new StaxStreamXMLReader(stream); ContentHandler handler; if (this.contentHandlerClass != null) { - handler = this.contentHandlerClass.newInstance(); + handler = this.contentHandlerClass.getDeclaredConstructor().newInstance(); } else { handler = this.contentHandler; } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index b7f6ac58ab135..d3acd07f20612 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -174,7 +174,6 @@ import org.apache.camel.support.jsse.SSLContextParameters; import org.apache.camel.support.service.BaseService; import org.apache.camel.support.service.ServiceHelper; -import org.apache.camel.support.startup.DefaultStartupStepRecorder; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; @@ -196,12 +195,8 @@ public abstract class AbstractCamelContext extends BaseService private static final Logger LOG = LoggerFactory.getLogger(AbstractCamelContext.class); - protected final InternalServiceManager internalServiceManager; + private final InternalServiceManager internalServiceManager; - volatile StartupStepRecorder startupStepRecorder = new DefaultStartupStepRecorder(); - int defaultRouteStartupOrder = 1000; - - private final Object lock = new Object(); private final DefaultCamelContextExtension camelContextExtension = new DefaultCamelContextExtension(this); private final AtomicInteger endpointKeyCounter = new AtomicInteger(); private final List endpointStrategies = new ArrayList<>(); @@ -216,7 +211,7 @@ public abstract class AbstractCamelContext extends BaseService private final ThreadLocal isLockModel = new ThreadLocal<>(); private final Map routeServices = new LinkedHashMap<>(); private final Map suspendedRouteServices = new LinkedHashMap<>(); - private final InternalRouteStartupManager internalRouteStartupManager = new InternalRouteStartupManager(this); + private final InternalRouteStartupManager internalRouteStartupManager = new InternalRouteStartupManager(); private final List routeStartupOrder = new ArrayList<>(); private final StopWatch stopWatch = new StopWatch(false); private final ThreadLocal> componentsInCreation = ThreadLocal.withInitial(() -> new HashSet<>()); @@ -224,7 +219,7 @@ public abstract class AbstractCamelContext extends BaseService private String managementName; private ClassLoader applicationContextClassLoader; private boolean autoCreateComponents = true; - private volatile VaultConfiguration vaultConfiguration = new VaultConfiguration(); + private VaultConfiguration vaultConfiguration = new VaultConfiguration(); private final List routePolicyFactories = new ArrayList<>(); // special flags to control the first startup which can are special @@ -261,12 +256,6 @@ public abstract class AbstractCamelContext extends BaseService private Boolean autowiredEnabled = Boolean.TRUE; private Long delay; private Map globalOptions = new HashMap<>(); - private volatile TypeConverter typeConverter; - private volatile TypeConverterRegistry typeConverterRegistry; - private volatile Injector injector; - private volatile ShutdownStrategy shutdownStrategy; - private volatile ExecutorServiceManager executorServiceManager; - private volatile RouteController routeController; private EndpointRegistry endpoints; private RuntimeEndpointRegistry runtimeEndpointRegistry; private ShutdownRoute shutdownRoute = ShutdownRoute.Default; @@ -316,7 +305,7 @@ protected AbstractCamelContext(boolean build) { // add the default bootstrap closer camelContextExtension.addBootstrap(new DefaultServiceBootstrapCloseable(this)); - this.internalServiceManager = new InternalServiceManager(this, internalRouteStartupManager, startupListeners); + this.internalServiceManager = new InternalServiceManager(internalRouteStartupManager, startupListeners); initPlugins(); @@ -518,6 +507,7 @@ public Component getComponent(String name, boolean autoCreateComponents, boolean if (component != null && created.get() && autoStart && (isStarted() || isStarting())) { // If the component is looked up after the context is started, // lets start it up. + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); StartupStep step = startupStepRecorder.beginStep(Component.class, name, "Start Component"); startService(component); startupStepRecorder.endStep(step); @@ -538,6 +528,8 @@ public Component getComponent(String name, boolean autoCreateComponents, boolean private Component initComponent(String name, boolean autoCreateComponents) { Component component = null; if (autoCreateComponents) { + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); + StartupStep step = startupStepRecorder.beginStep(Component.class, name, "Resolve Component"); try { if (LOG.isDebugEnabled()) { @@ -734,6 +726,7 @@ public Collection removeEndpoints(String uri) throws Exception { @Override public Endpoint getEndpoint(String uri) { + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); StartupStep step = null; // only record startup step during startup (not started) if (!isStarted() && startupStepRecorder.isEnabled()) { @@ -925,19 +918,12 @@ public GlobalEndpointConfiguration getGlobalEndpointConfiguration() { @Override public RouteController getRouteController() { - if (routeController == null) { - synchronized (lock) { - if (routeController == null) { - setRouteController(createRouteController()); - } - } - } - return routeController; + return camelContextExtension.getRouteController(); } @Override public void setRouteController(RouteController routeController) { - this.routeController = internalServiceManager.addService(routeController); + camelContextExtension.setRouteController(routeController); } @Override @@ -1057,7 +1043,7 @@ public void setLockModel(boolean lockModel) { } public void startAllRoutes() throws Exception { - internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, true, false, false); + internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, true, false, false); } private void doStopRoutes(RouteController controller, Comparator comparator) throws Exception { @@ -1088,6 +1074,8 @@ public void stopAllRoutes() throws Exception { // stop all routes in reverse order that they were started Comparator comparator = Comparator.comparingInt(RouteStartupOrder::getStartupOrder); + + final ShutdownStrategy shutdownStrategy = camelContextExtension.getShutdownStrategy(); if (shutdownStrategy == null || shutdownStrategy.isShutdownRoutesInReverseOrder()) { comparator = comparator.reversed(); } @@ -1102,6 +1090,7 @@ public void stopAllRoutes() throws Exception { public void removeAllRoutes() throws Exception { // stop all routes in reverse order that they were started Comparator comparator = Comparator.comparingInt(RouteStartupOrder::getStartupOrder); + final ShutdownStrategy shutdownStrategy = getShutdownStrategy(); if (shutdownStrategy == null || shutdownStrategy.isShutdownRoutesInReverseOrder()) { comparator = comparator.reversed(); } @@ -1164,7 +1153,8 @@ public synchronized boolean stopRoute( try { RouteStartupOrder route = new DefaultRouteStartupOrder(1, routeService.getRoute(), routeService); - boolean completed = getShutdownStrategy().shutdown(this, route, timeout, timeUnit, abortAfterTimeout); + boolean completed = camelContextExtension.getShutdownStrategy().shutdown(this, route, timeout, timeUnit, + abortAfterTimeout); if (completed) { // must stop route service as well stopRouteService(routeService, false, loggingLevel); @@ -1325,12 +1315,12 @@ public void addService(Object object, boolean stopOnShutdown) throws Exception { @Override public void addService(Object object, boolean stopOnShutdown, boolean forceStart) throws Exception { - internalServiceManager.doAddService(object, stopOnShutdown, forceStart, true); + internalServiceManager.doAddService(this, object, stopOnShutdown, forceStart, true); } @Override public void addPrototypeService(Object object) throws Exception { - internalServiceManager.addService(object, false, true, false); + internalServiceManager.addService(this, object, false, true, false); } @Override @@ -1367,7 +1357,7 @@ public Set hasServices(Class type) { @Override public void deferStartService(Object object, boolean stopOnShutdown) throws Exception { - internalServiceManager.deferStartService(object, stopOnShutdown, false); + internalServiceManager.deferStartService(this, object, stopOnShutdown, false); } protected List getStartupListeners() { @@ -1501,6 +1491,7 @@ public Language resolveLanguage(String name) { } private Language doResolveLanguage(String name) { + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); StartupStep step = null; // only record startup step during startup (not started) if (!isStarted() && startupStepRecorder.isEnabled()) { @@ -1555,60 +1546,42 @@ public String resolvePropertyPlaceholders(String text) { @Override public TypeConverter getTypeConverter() { - return typeConverter; + return camelContextExtension.getTypeConverter(); } public void setTypeConverter(TypeConverter typeConverter) { - this.typeConverter = internalServiceManager.addService(typeConverter); + camelContextExtension.setTypeConverter(typeConverter); } + /** + * Get a type converter or create a new one if unset + * + * @deprecated use DefaultCamelContextExtension#getOrCreateTypeConverter() + * @return A type converter instance + */ + @Deprecated(since = "4.2.0") protected TypeConverter getOrCreateTypeConverter() { - if (typeConverter == null) { - synchronized (lock) { - if (typeConverter == null) { - setTypeConverter(createTypeConverter()); - } - } - } - return typeConverter; + return camelContextExtension.getOrCreateTypeConverter(); } @Override public TypeConverterRegistry getTypeConverterRegistry() { - if (typeConverterRegistry == null) { - synchronized (lock) { - if (typeConverterRegistry == null) { - setTypeConverterRegistry(createTypeConverterRegistry()); - } - } - } - return typeConverterRegistry; + return camelContextExtension.getTypeConverterRegistry(); } @Override public void setTypeConverterRegistry(TypeConverterRegistry typeConverterRegistry) { - this.typeConverterRegistry = internalServiceManager.addService(typeConverterRegistry); - // some registries are also a type converter implementation - if (typeConverterRegistry instanceof TypeConverter) { - this.typeConverter = (TypeConverter) typeConverterRegistry; - } + camelContextExtension.setTypeConverterRegistry(typeConverterRegistry); } @Override public Injector getInjector() { - if (injector == null) { - synchronized (lock) { - if (injector == null) { - setInjector(createInjector()); - } - } - } - return injector; + return camelContextExtension.getInjector(); } @Override public void setInjector(Injector injector) { - this.injector = internalServiceManager.addService(injector); + camelContextExtension.setInjector(injector); } @Override @@ -1851,7 +1824,7 @@ public RuntimeEndpointRegistry getRuntimeEndpointRegistry() { @Override public void setRuntimeEndpointRegistry(RuntimeEndpointRegistry runtimeEndpointRegistry) { - this.runtimeEndpointRegistry = internalServiceManager.addService(runtimeEndpointRegistry); + this.runtimeEndpointRegistry = internalServiceManager.addService(this, runtimeEndpointRegistry); } @Override @@ -1908,7 +1881,7 @@ protected void doSuspend() throws Exception { Route route = entry.getValue().getRoute(); Integer order = route.getStartupOrder(); if (order == null) { - order = defaultRouteStartupOrder++; + order = internalRouteStartupManager.incrementRouteStartupOrder(); } orders.add(new DefaultRouteStartupOrder(order, route, entry.getValue())); } @@ -1949,7 +1922,7 @@ protected void doResume() throws Exception { // start the suspended routes (do not check for route clashes, and // indicate) - internalRouteStartupManager.doStartOrResumeRoutes(suspendedRouteServices, false, true, true, false); + internalRouteStartupManager.doStartOrResumeRoutes(this, suspendedRouteServices, false, true, true, false); // mark the route services as resumed (will be marked as started) as // well @@ -2067,11 +2040,13 @@ public void doBuild() throws Exception { getCamelContextExtension().addContextPlugin(NodeIdFactory.class, createNodeIdFactory()); // auto-detect step recorder from classpath if none has been explicit configured + StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); if (startupStepRecorder.getClass().getSimpleName().equals("DefaultStartupStepRecorder")) { StartupStepRecorder fr = camelContextExtension.getBootstrapFactoryFinder() .newInstance(StartupStepRecorder.FACTORY, StartupStepRecorder.class).orElse(null); if (fr != null) { LOG.debug("Discovered startup recorder: {} from classpath", fr); + camelContextExtension.setStartupStepRecorder(fr); startupStepRecorder = fr; } } @@ -2124,7 +2099,7 @@ public void doBuild() throws Exception { // Setup type converter eager as its highly in use and should not be lazy initialized if (eagerCreateTypeConverter()) { StartupStep step5 = startupStepRecorder.beginStep(CamelContext.class, null, "Setting up TypeConverter"); - getOrCreateTypeConverter(); + camelContextExtension.getOrCreateTypeConverter(); startupStepRecorder.endStep(step5); } @@ -2148,10 +2123,11 @@ public void doInit() throws Exception { vetoed = null; + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); StartupStep step = startupStepRecorder.beginStep(CamelContext.class, null, "Init CamelContext"); // init the route controller - this.routeController = getRouteController(); + final RouteController routeController = getRouteController(); if (startupSummaryLevel == StartupSummaryLevel.Verbose) { // verbose startup should let route controller do the route startup logging if (routeController.getLoggingLevel().ordinal() < LoggingLevel.INFO.ordinal()) { @@ -2160,7 +2136,7 @@ public void doInit() throws Exception { } // init the shutdown strategy - this.shutdownStrategy = getShutdownStrategy(); + final ShutdownStrategy shutdownStrategy = getShutdownStrategy(); if (startupSummaryLevel == StartupSummaryLevel.Verbose) { // verbose startup should let route controller do the route shutdown logging if (shutdownStrategy != null && shutdownStrategy.getLoggingLevel().ordinal() < LoggingLevel.INFO.ordinal()) { @@ -2173,6 +2149,7 @@ public void doInit() throws Exception { // ensure additional type converters is loaded (either if enabled or we should use package scanning from the base) boolean load = loadTypeConverters || camelContextExtension.getBasePackageScan() != null; + final TypeConverter typeConverter = camelContextExtension.getTypeConverter(); if (load && typeConverter instanceof AnnotationScanTypeConverters) { StartupStep step2 = startupStepRecorder.beginStep(CamelContext.class, null, "Scan TypeConverters"); ((AnnotationScanTypeConverters) typeConverter).scanTypeConverters(); @@ -2269,7 +2246,7 @@ public void doInit() throws Exception { // re-create endpoint registry as the cache size limit may be set after the constructor of this instance was called. // and we needed to create endpoints up-front as it may be accessed before this context is started - endpoints = internalServiceManager.addService(createEndpointRegistry(endpoints)); + endpoints = internalServiceManager.addService(this, createEndpointRegistry(endpoints)); // optimised to not include runtimeEndpointRegistry unless startServices // is enabled or JMX statistics is in extended mode @@ -2321,7 +2298,7 @@ && getManagementStrategy().getManagementAgent() != null) { // the method is called start but at this point it will only initialize (as context is starting up) startRouteDefinitions(); // this will init route definitions and populate as route services which we can then initialize now - internalRouteStartupManager.doInitRoutes(routeServices); + internalRouteStartupManager.doInitRoutes(this, routeServices); startupStepRecorder.endStep(subStep); if (!lifecycleStrategies.isEmpty()) { @@ -2364,6 +2341,7 @@ protected void doStart() throws Exception { LOG.warn("Starting CamelContext: {} after the context has been stopped is not recommended", camelContextExtension.getName()); } + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); StartupStep step = startupStepRecorder.beginStep(CamelContext.class, camelContextExtension.getName(), "Start CamelContext"); @@ -2392,7 +2370,7 @@ protected void doStartContext() throws Exception { stopWatch.restart(); // Start the route controller - startService(this.routeController); + startService(camelContextExtension.getRouteController()); doNotStartRoutesOnFirstStart = !firstStartDone && !isAutoStartup(); @@ -2403,7 +2381,7 @@ protected void doStartContext() throws Exception { // invoke this logic to warm up the routes and if possible also // start the routes try { - internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, true, false, true); + internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, true, false, true); } catch (Exception e) { throw RuntimeCamelException.wrapRuntimeException(e); } @@ -2605,6 +2583,7 @@ protected void doStartCamel() throws Exception { startService(getManagementStrategy()); // start lifecycle strategies + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); if (!lifecycleStrategies.isEmpty()) { StartupStep subStep = startupStepRecorder.beginStep(CamelContext.class, camelContextExtension.getName(), @@ -2731,7 +2710,8 @@ protected void doStartCamel() throws Exception { StartupStep subStep = startupStepRecorder.beginStep(CamelContext.class, camelContextExtension.getName(), "Start Routes"); EventHelper.notifyCamelContextRoutesStarting(this); - internalRouteStartupManager.doStartOrResumeRoutes(routeServices, true, !doNotStartRoutesOnFirstStart, false, true); + internalRouteStartupManager.doStartOrResumeRoutes(this, routeServices, true, !doNotStartRoutesOnFirstStart, false, + true); EventHelper.notifyCamelContextRoutesStarted(this); startupStepRecorder.endStep(subStep); } @@ -2762,6 +2742,7 @@ protected void doStartCamel() throws Exception { @Override protected void doStop() throws Exception { stopWatch.restart(); + final ShutdownStrategy shutdownStrategy = getShutdownStrategy(); if (startupSummaryLevel != StartupSummaryLevel.Oneline && startupSummaryLevel != StartupSummaryLevel.Off) { if (shutdownStrategy != null && shutdownStrategy.getTimeUnit() != null) { @@ -2779,7 +2760,7 @@ protected void doStop() throws Exception { EventHelper.notifyCamelContextRoutesStopping(this); // Stop the route controller - ServiceHelper.stopAndShutdownService(this.routeController); + camelContextExtension.stopAndShutdownRouteController(); // stop route inputs in the same order as they were started, so we stop // the very first inputs at first @@ -2802,7 +2783,7 @@ protected void doStop() throws Exception { boolean found = routeStartupOrder.stream().anyMatch(o -> o.getRoute().getId().equals(routeService.getId())); if (!found) { LOG.debug("Route: {} which failed to startup will be stopped", routeService.getId()); - routeStartupOrder.add(internalRouteStartupManager.doPrepareRouteToBeStarted(routeService)); + routeStartupOrder.add(internalRouteStartupManager.doPrepareRouteToBeStarted(this, routeService)); } } @@ -2833,7 +2814,7 @@ protected void doStop() throws Exception { // consumer (eg @Consumer) // which we need to stop after the routes, as a POJO consumer is // essentially a route also - internalServiceManager.stopConsumers(); + internalServiceManager.stopConsumers(this); // the stop order is important @@ -2857,7 +2838,7 @@ protected void doStop() throws Exception { languages.clear(); // shutdown services as late as possible (except type converters as they may be needed during the remainder of the stopping) - internalServiceManager.shutdownServices(); + internalServiceManager.shutdownServices(this); try { for (LifecycleStrategy strategy : lifecycleStrategies) { @@ -2885,13 +2866,13 @@ protected void doStop() throws Exception { // the route back as before // shutdown executor service, reactive executor last - InternalServiceManager.shutdownServices(this, executorServiceManager); + InternalServiceManager.shutdownServices(this, camelContextExtension.getExecutorServiceManager()); InternalServiceManager.shutdownServices(this, camelContextExtension.getReactiveExecutor()); // shutdown type converter and registry as late as possible - ServiceHelper.stopService(typeConverter); - ServiceHelper.stopService(typeConverterRegistry); - ServiceHelper.stopService(camelContextExtension.getRegistry()); + camelContextExtension.stopTypeConverter(); + camelContextExtension.stopTypeConverterRegistry(); + camelContextExtension.stopRegistry(); // stop the lazy created so they can be re-created on restart forceStopLazyInitialization(); @@ -2905,6 +2886,7 @@ protected void doStop() throws Exception { } // ensure any recorder is stopped in case it was kept running + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); startupStepRecorder.stop(); // and clear start date @@ -2936,6 +2918,7 @@ protected void logRouteStopSummary(LoggingLevel loggingLevel) { int forced = 0; List lines = new ArrayList<>(); + final ShutdownStrategy shutdownStrategy = camelContextExtension.getShutdownStrategy(); if (shutdownStrategy != null && shutdownStrategy.isShutdownRoutesInReverseOrder()) { routeStartupOrder.sort(Comparator.comparingInt(RouteStartupOrder::getStartupOrder).reversed()); } else { @@ -3046,10 +3029,12 @@ public synchronized void startRouteService(RouteService routeService, boolean ad // start the route service routeServices.put(routeService.getId(), routeService); if (shouldStartRoutes()) { + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); StartupStep step = startupStepRecorder.beginStep(Route.class, routeService.getId(), "Start Route Services"); // this method will log the routes being started - internalRouteStartupManager.safelyStartRouteServices(true, true, true, false, addingRoutes, routeService); + internalRouteStartupManager.safelyStartRouteServices(this, true, true, true, false, addingRoutes, + routeService); // start route services if it was configured to auto startup // and we are not adding routes boolean isAutoStartup = routeService.isAutoStartup(); @@ -3080,7 +3065,7 @@ protected synchronized void resumeRouteService(RouteService routeService) throws // resume the route service if (shouldStartRoutes()) { // this method will log the routes being started - internalRouteStartupManager.safelyStartRouteServices(true, false, true, true, false, routeService); + internalRouteStartupManager.safelyStartRouteServices(this, true, false, true, true, false, routeService); // must resume route service as well routeService.resume(); } @@ -3134,6 +3119,7 @@ protected synchronized void suspendRouteService(RouteService routeService) throw * Force some lazy initialization to occur upfront before we start any components and create routes */ protected void forceLazyInitialization() { + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); StartupStep step = startupStepRecorder.beginStep(CamelContext.class, camelContextExtension.getName(), "Start Mandatory Services"); initEagerMandatoryServices(); @@ -3179,9 +3165,9 @@ protected void doStartStandardServices() { * Force clear lazy initialization so they can be re-created on restart */ protected void forceStopLazyInitialization() { - injector = null; - typeConverterRegistry = null; - typeConverter = null; + camelContextExtension.resetInjector(); + camelContextExtension.resetTypeConverterRegistry(); + camelContextExtension.resetTypeConverter(); } /** @@ -3453,6 +3439,8 @@ public void setApplicationContextClassLoader(ClassLoader classLoader) { private DataFormat doResolveDataFormat(String name) { StartupStep step = null; + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); + // only record startup step during startup (not started) if (!isStarted() && startupStepRecorder.isEnabled()) { step = startupStepRecorder.beginStep(DataFormat.class, name, "Resolve DataFormat"); @@ -3488,6 +3476,7 @@ public DataFormat resolveDataFormat(String name) { public DataFormat createDataFormat(String name) { StartupStep step = null; // only record startup step during startup (not started) + final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); if (!isStarted() && startupStepRecorder.isEnabled()) { step = startupStepRecorder.beginStep(DataFormat.class, name, "Create DataFormat"); } @@ -3515,19 +3504,12 @@ public Set getDataFormatNames() { @Override public ShutdownStrategy getShutdownStrategy() { - if (shutdownStrategy == null) { - synchronized (lock) { - if (shutdownStrategy == null) { - setShutdownStrategy(createShutdownStrategy()); - } - } - } - return shutdownStrategy; + return camelContextExtension.getShutdownStrategy(); } @Override public void setShutdownStrategy(ShutdownStrategy shutdownStrategy) { - this.shutdownStrategy = internalServiceManager.addService(shutdownStrategy); + camelContextExtension.setShutdownStrategy(shutdownStrategy); } @Override @@ -3582,21 +3564,12 @@ public void setAutowiredEnabled(Boolean autowiredEnabled) { @Override public ExecutorServiceManager getExecutorServiceManager() { - if (executorServiceManager == null) { - synchronized (lock) { - if (executorServiceManager == null) { - setExecutorServiceManager(createExecutorServiceManager()); - } - } - } - return this.executorServiceManager; + return camelContextExtension.getExecutorServiceManager(); } @Override public void setExecutorServiceManager(ExecutorServiceManager executorServiceManager) { - // special for executorServiceManager as want to stop it manually so - // false in stopOnShutdown - this.executorServiceManager = internalServiceManager.addService(executorServiceManager, false); + camelContextExtension.setExecutorServiceManager(executorServiceManager); } @Override @@ -3624,7 +3597,7 @@ public void setDebugger(Debugger debugger) { if (isStartingOrStarted()) { throw new IllegalStateException("Cannot set debugger on a started CamelContext"); } - this.debugger = internalServiceManager.addService(debugger, true, false, true); + this.debugger = internalServiceManager.addService(this, debugger, true, false, true); } @Override diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index af24a796b0f08..e343e51536ef4 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -285,7 +285,7 @@ public boolean process(Exchange exchange, AsyncCallback originalCallback) { // in between the: // CAMEL END USER - DEBUG ME HERE +++ START +++ // CAMEL END USER - DEBUG ME HERE +++ END +++ - // you can see in the code below. + // you can see in the code below within the processTransacted or processNonTransacted methods. // ---------------------------------------------------------- if (processor == null || exchange.isRouteStop()) { @@ -295,15 +295,7 @@ public boolean process(Exchange exchange, AsyncCallback originalCallback) { } if (shutdownStrategy.isForceShutdown()) { - String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " - + exchange; - LOG.debug(msg); - if (exchange.getException() == null) { - exchange.setException(new RejectedExecutionException(msg)); - } - // force shutdown so we should not continue - originalCallback.done(true); - return true; + return processShutdown(exchange, originalCallback); } Object[] states; @@ -328,76 +320,104 @@ public boolean process(Exchange exchange, AsyncCallback originalCallback) { states[j++] = state; } } catch (Exception e) { - // error in before so break out - exchange.setException(e); - try { - originalCallback.done(true); - } finally { - // task is done so reset - if (taskFactory != null) { - taskFactory.release(afterTask); - } - } - return true; + return handleException(exchange, originalCallback, e, afterTask); } } if (exchange.isTransacted()) { - // must be synchronized for transacted exchanges - if (LOG.isTraceEnabled()) { - LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), - exchange); - } - try { - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ START +++ - // ---------------------------------------------------------- - processor.process(exchange); - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ END +++ - // ---------------------------------------------------------- - } catch (Exception e) { - exchange.setException(e); - } finally { - // processing is done - afterTask.done(true); - } - // we are done synchronously - must return true - return true; + return processTransacted(exchange, afterTask); } else { - final UnitOfWork uow = exchange.getUnitOfWork(); + return processNonTransacted(exchange, afterTask); + } + } - // optimize to only do before uow processing if really needed - AsyncCallback async = afterTask; - boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess(); - if (beforeAndAfter) { - async = uow.beforeProcess(processor, exchange, async); - } + private static boolean processShutdown(Exchange exchange, AsyncCallback originalCallback) { + String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + + exchange; + LOG.debug(msg); + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException(msg)); + } + // force shutdown so we should not continue + originalCallback.done(true); + return true; + } + + private boolean processNonTransacted(Exchange exchange, CamelInternalTask afterTask) { + final AsyncCallback async = beforeProcess(exchange, afterTask); + + // ---------------------------------------------------------- + // CAMEL END USER - DEBUG ME HERE +++ START +++ + // ---------------------------------------------------------- + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); + } + boolean sync = processor.process(exchange, async); + if (!sync) { + EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange); + } + + // ---------------------------------------------------------- + // CAMEL END USER - DEBUG ME HERE +++ END +++ + // ---------------------------------------------------------- + // CAMEL-18255: move uow.afterProcess handling to the callback + + if (LOG.isTraceEnabled()) { + LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", + sync ? "synchronously" : "asynchronously", + exchange.getExchangeId(), exchange); + } + return sync; + } + + private AsyncCallback beforeProcess(Exchange exchange, CamelInternalTask afterTask) { + final UnitOfWork uow = exchange.getUnitOfWork(); + + // optimize to only do before uow processing if really needed + if (uow != null && uow.isBeforeAfterProcess()) { + return uow.beforeProcess(processor, exchange, afterTask); + } + return afterTask; + } + + private boolean processTransacted(Exchange exchange, CamelInternalTask afterTask) { + // must be synchronized for transacted exchanges + if (LOG.isTraceEnabled()) { + LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), + exchange); + } + try { // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ START +++ // ---------------------------------------------------------- - if (LOG.isTraceEnabled()) { - LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); - } - boolean sync = processor.process(exchange, async); - if (!sync) { - EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange); - } - + processor.process(exchange); // ---------------------------------------------------------- // CAMEL END USER - DEBUG ME HERE +++ END +++ // ---------------------------------------------------------- + } catch (Exception e) { + exchange.setException(e); + } finally { + // processing is done + afterTask.done(true); + } + // we are done synchronously - must return true + return true; + } - // CAMEL-18255: move uow.afterProcess handling to the callback - - if (LOG.isTraceEnabled()) { - LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", - sync ? "synchronously" : "asynchronously", - exchange.getExchangeId(), exchange); + private boolean handleException( + Exchange exchange, AsyncCallback originalCallback, Exception e, CamelInternalTask afterTask) { + // error in before so break out + exchange.setException(e); + try { + originalCallback.done(true); + } finally { + // task is done so reset + if (taskFactory != null) { + taskFactory.release(afterTask); } - return sync; } + return true; } @Override @@ -820,9 +840,10 @@ public UnitOfWork before(Exchange exchange) throws Exception { } // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW - UnitOfWork created = null; + UnitOfWork uow = exchange.getUnitOfWork(); + UnitOfWork created = null; if (uow == null) { // If there is no existing UoW, then we should start one and // terminate it once processing is completed for the exchange. diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java index e01231b5f1203..ba5683459c56b 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java @@ -36,6 +36,7 @@ import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; import org.apache.camel.Service; +import org.apache.camel.TypeConverter; import org.apache.camel.spi.BootstrapCloseable; import org.apache.camel.spi.CamelContextNameStrategy; import org.apache.camel.spi.ClassResolver; @@ -46,9 +47,11 @@ import org.apache.camel.spi.EventNotifier; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.ExchangeFactoryManager; +import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.HeadersMapFactory; import org.apache.camel.spi.InflightRepository; +import org.apache.camel.spi.Injector; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.LogListener; @@ -68,15 +71,19 @@ import org.apache.camel.spi.RestRegistryFactory; import org.apache.camel.spi.RouteController; import org.apache.camel.spi.RouteStartupOrder; +import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StartupStepRecorder; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.Tracer; import org.apache.camel.spi.TransformerRegistry; +import org.apache.camel.spi.TypeConverterRegistry; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.spi.ValidatorRegistry; import org.apache.camel.support.EndpointHelper; import org.apache.camel.support.NormalizedUri; import org.apache.camel.support.PluginHelper; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.support.startup.DefaultStartupStepRecorder; import org.apache.camel.util.StringHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,6 +125,15 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { private volatile Tracer tracer; private volatile TransformerRegistry transformerRegistry; private volatile ValidatorRegistry validatorRegistry; + private volatile TypeConverterRegistry typeConverterRegistry; + private volatile TypeConverter typeConverter; + private volatile RouteController routeController; + private volatile ShutdownStrategy shutdownStrategy; + private volatile ExecutorServiceManager executorServiceManager; + + private volatile Injector injector; + + private volatile StartupStepRecorder startupStepRecorder = new DefaultStartupStepRecorder(); @Deprecated private ErrorHandlerFactory errorHandlerFactory; @@ -154,7 +170,7 @@ CamelContextNameStrategy getNameStrategy() { } void setNameStrategy(CamelContextNameStrategy nameStrategy) { - this.nameStrategy = camelContext.getInternalServiceManager().addService(nameStrategy); + this.nameStrategy = camelContext.getInternalServiceManager().addService(camelContext, nameStrategy); } ManagementNameStrategy getManagementNameStrategy() { @@ -169,7 +185,7 @@ ManagementNameStrategy getManagementNameStrategy() { } void setManagementNameStrategy(ManagementNameStrategy managementNameStrategy) { - this.managementNameStrategy = camelContext.getInternalServiceManager().addService(managementNameStrategy); + this.managementNameStrategy = camelContext.getInternalServiceManager().addService(camelContext, managementNameStrategy); } PropertiesComponent getPropertiesComponent() { @@ -184,7 +200,7 @@ PropertiesComponent getPropertiesComponent() { } void setPropertiesComponent(PropertiesComponent propertiesComponent) { - this.propertiesComponent = camelContext.getInternalServiceManager().addService(propertiesComponent); + this.propertiesComponent = camelContext.getInternalServiceManager().addService(camelContext, propertiesComponent); } @Override @@ -326,7 +342,12 @@ public ManagementMBeanAssembler getManagementMBeanAssembler() { } void setManagementMBeanAssembler(ManagementMBeanAssembler managementMBeanAssembler) { - this.managementMBeanAssembler = camelContext.getInternalServiceManager().addService(managementMBeanAssembler, false); + this.managementMBeanAssembler + = camelContext.getInternalServiceManager().addService(camelContext, managementMBeanAssembler, false); + } + + void stopRegistry() { + ServiceHelper.stopService(registry); } @Override @@ -506,7 +527,7 @@ public HeadersMapFactory getHeadersMapFactory() { @Override public void setHeadersMapFactory(HeadersMapFactory headersMapFactory) { - this.headersMapFactory = camelContext.getInternalServiceManager().addService(headersMapFactory); + this.headersMapFactory = camelContext.getInternalServiceManager().addService(camelContext, headersMapFactory); } void initEagerMandatoryServices(boolean caseInsensitive, Supplier headersMapFactorySupplier) { @@ -559,7 +580,7 @@ public ExchangeFactoryManager getExchangeFactoryManager() { @Override public void setExchangeFactoryManager(ExchangeFactoryManager exchangeFactoryManager) { - this.exchangeFactoryManager = camelContext.getInternalServiceManager().addService(exchangeFactoryManager); + this.exchangeFactoryManager = camelContext.getInternalServiceManager().addService(camelContext, exchangeFactoryManager); } @Override @@ -597,7 +618,7 @@ public ReactiveExecutor getReactiveExecutor() { public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) { // special for executorServiceManager as want to stop it manually so // false in stopOnShutdown - this.reactiveExecutor = camelContext.getInternalServiceManager().addService(reactiveExecutor, false); + this.reactiveExecutor = camelContext.getInternalServiceManager().addService(camelContext, reactiveExecutor, false); } RestRegistryFactory getRestRegistryFactory() { @@ -612,7 +633,7 @@ RestRegistryFactory getRestRegistryFactory() { } void setRestRegistryFactory(RestRegistryFactory restRegistryFactory) { - this.restRegistryFactory = camelContext.getInternalServiceManager().addService(restRegistryFactory); + this.restRegistryFactory = camelContext.getInternalServiceManager().addService(camelContext, restRegistryFactory); } RestRegistry getRestRegistry() { @@ -627,7 +648,7 @@ RestRegistry getRestRegistry() { } void setRestRegistry(RestRegistry restRegistry) { - this.restRegistry = camelContext.getInternalServiceManager().addService(restRegistry); + this.restRegistry = camelContext.getInternalServiceManager().addService(camelContext, restRegistry); } RestConfiguration getRestConfiguration() { @@ -657,7 +678,7 @@ ClassResolver getClassResolver() { } void setClassResolver(ClassResolver classResolver) { - this.classResolver = camelContext.getInternalServiceManager().addService(classResolver); + this.classResolver = camelContext.getInternalServiceManager().addService(camelContext, classResolver); } MessageHistoryFactory getMessageHistoryFactory() { @@ -672,7 +693,7 @@ MessageHistoryFactory getMessageHistoryFactory() { } void setMessageHistoryFactory(MessageHistoryFactory messageHistoryFactory) { - this.messageHistoryFactory = camelContext.getInternalServiceManager().addService(messageHistoryFactory); + this.messageHistoryFactory = camelContext.getInternalServiceManager().addService(camelContext, messageHistoryFactory); } StreamCachingStrategy getStreamCachingStrategy() { @@ -688,7 +709,7 @@ StreamCachingStrategy getStreamCachingStrategy() { void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy) { this.streamCachingStrategy - = camelContext.getInternalServiceManager().addService(streamCachingStrategy, true, false, true); + = camelContext.getInternalServiceManager().addService(camelContext, streamCachingStrategy, true, false, true); } InflightRepository getInflightRepository() { @@ -703,7 +724,7 @@ InflightRepository getInflightRepository() { } void setInflightRepository(InflightRepository repository) { - this.inflightRepository = camelContext.getInternalServiceManager().addService(repository); + this.inflightRepository = camelContext.getInternalServiceManager().addService(camelContext, repository); } UuidGenerator getUuidGenerator() { @@ -718,7 +739,7 @@ UuidGenerator getUuidGenerator() { } void setUuidGenerator(UuidGenerator uuidGenerator) { - this.uuidGenerator = camelContext.getInternalServiceManager().addService(uuidGenerator); + this.uuidGenerator = camelContext.getInternalServiceManager().addService(camelContext, uuidGenerator); } Tracer getTracer() { @@ -733,7 +754,7 @@ Tracer getTracer() { } void setTracer(Tracer tracer) { - this.tracer = camelContext.getInternalServiceManager().addService(tracer, true, false, true); + this.tracer = camelContext.getInternalServiceManager().addService(camelContext, tracer, true, false, true); } TransformerRegistry getTransformerRegistry() { @@ -748,7 +769,7 @@ TransformerRegistry getTransformerRegistry() { } void setTransformerRegistry(TransformerRegistry transformerRegistry) { - this.transformerRegistry = camelContext.getInternalServiceManager().addService(transformerRegistry); + this.transformerRegistry = camelContext.getInternalServiceManager().addService(camelContext, transformerRegistry); } ValidatorRegistry getValidatorRegistry() { @@ -763,7 +784,133 @@ ValidatorRegistry getValidatorRegistry() { } public void setValidatorRegistry(ValidatorRegistry validatorRegistry) { - this.validatorRegistry = camelContext.getInternalServiceManager().addService(validatorRegistry); + this.validatorRegistry = camelContext.getInternalServiceManager().addService(camelContext, validatorRegistry); + } + + void stopTypeConverterRegistry() { + ServiceHelper.stopService(typeConverterRegistry); + } + + void resetTypeConverterRegistry() { + typeConverterRegistry = null; + } + + TypeConverterRegistry getTypeConverterRegistry() { + if (typeConverterRegistry == null) { + synchronized (lock) { + if (typeConverterRegistry == null) { + setTypeConverterRegistry(camelContext.createTypeConverterRegistry()); + + // some registries are also a type converter implementation + if (typeConverterRegistry instanceof TypeConverter newTypeConverter) { + setTypeConverter(newTypeConverter); + } + } + } + } + return typeConverterRegistry; + } + + void setTypeConverterRegistry(TypeConverterRegistry typeConverterRegistry) { + this.typeConverterRegistry = camelContext.getInternalServiceManager().addService(camelContext, typeConverterRegistry); + } + + void stopTypeConverter() { + ServiceHelper.stopService(typeConverter); + } + + void resetTypeConverter() { + typeConverter = null; + } + + TypeConverter getTypeConverter() { + return typeConverter; + } + + void setTypeConverter(TypeConverter typeConverter) { + this.typeConverter = camelContext.getInternalServiceManager().addService(camelContext, typeConverter); + } + + TypeConverter getOrCreateTypeConverter() { + if (typeConverter == null) { + synchronized (lock) { + if (typeConverter == null) { + setTypeConverter(camelContext.createTypeConverter()); + } + } + } + return typeConverter; + } + + void resetInjector() { + injector = null; + } + + Injector getInjector() { + if (injector == null) { + synchronized (lock) { + if (injector == null) { + setInjector(camelContext.createInjector()); + } + } + } + return injector; + } + + void setInjector(Injector injector) { + this.injector = camelContext.getInternalServiceManager().addService(camelContext, injector); + } + + void stopAndShutdownRouteController() { + ServiceHelper.stopAndShutdownService(this.routeController); + } + + RouteController getRouteController() { + if (routeController == null) { + synchronized (lock) { + if (routeController == null) { + setRouteController(camelContext.createRouteController()); + } + } + } + return routeController; + } + + void setRouteController(RouteController routeController) { + this.routeController = camelContext.getInternalServiceManager().addService(camelContext, routeController); + } + + ShutdownStrategy getShutdownStrategy() { + if (shutdownStrategy == null) { + synchronized (lock) { + if (shutdownStrategy == null) { + setShutdownStrategy(camelContext.createShutdownStrategy()); + } + } + } + return shutdownStrategy; + } + + void setShutdownStrategy(ShutdownStrategy shutdownStrategy) { + this.shutdownStrategy = camelContext.getInternalServiceManager().addService(camelContext, shutdownStrategy); + } + + ExecutorServiceManager getExecutorServiceManager() { + if (executorServiceManager == null) { + synchronized (lock) { + if (executorServiceManager == null) { + setExecutorServiceManager(camelContext.createExecutorServiceManager()); + } + } + } + return this.executorServiceManager; + } + + void setExecutorServiceManager(ExecutorServiceManager executorServiceManager) { + // special for executorServiceManager as want to stop it manually so + // false in stopOnShutdown + this.executorServiceManager + = camelContext.getInternalServiceManager().addService(camelContext, executorServiceManager, false); } @Override @@ -778,12 +925,12 @@ public EndpointUriFactory getEndpointUriFactory(String scheme) { @Override public StartupStepRecorder getStartupStepRecorder() { - return camelContext.startupStepRecorder; + return startupStepRecorder; } @Override public void setStartupStepRecorder(StartupStepRecorder startupStepRecorder) { - camelContext.startupStepRecorder = startupStepRecorder; + this.startupStepRecorder = startupStepRecorder; } @Override @@ -835,7 +982,7 @@ public T getContextPlugin(Class type) { @Override public void addContextPlugin(Class type, T module) { - final T addedModule = camelContext.getInternalServiceManager().addService(module); + final T addedModule = camelContext.getInternalServiceManager().addService(camelContext, module); pluginManager.addContextPlugin(type, addedModule); } @@ -847,7 +994,7 @@ public void lazyAddContextPlugin(Class type, Supplier module) { private T lazyInitAndAdd(Supplier supplier) { T module = supplier.get(); - return camelContext.getInternalServiceManager().addService(module); + return camelContext.getInternalServiceManager().addService(camelContext, module); } /* diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultContextPluginManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultContextPluginManager.java index 79b9e06ca5a39..f33ad017b612b 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultContextPluginManager.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultContextPluginManager.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.impl.engine; import java.util.Map; @@ -25,11 +24,8 @@ import org.apache.camel.spi.PluginManager; public class DefaultContextPluginManager implements PluginManager { - private final Map, Object> extensions = new ConcurrentHashMap<>(); - - public DefaultContextPluginManager() { - } + private final Map, Object> extensions = new ConcurrentHashMap<>(); @Override public T getContextPlugin(Class type) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInjector.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInjector.java index 332dd05b64fac..a8d9e67ecbcff 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInjector.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInjector.java @@ -34,8 +34,8 @@ public class DefaultInjector implements Injector { // use the reflection injector - private final CamelContext camelContext; - private final CamelBeanPostProcessor postProcessor; + protected final CamelContext camelContext; + protected final CamelBeanPostProcessor postProcessor; public DefaultInjector(CamelContext context) { this.camelContext = context; @@ -78,12 +78,7 @@ public T newInstance(Class type, boolean postProcessBean) { // inject camel context if needed CamelContextAware.trySetCamelContext(answer, camelContext); if (postProcessBean) { - try { - postProcessor.postProcessBeforeInitialization(answer, answer.getClass().getName()); - postProcessor.postProcessAfterInitialization(answer, answer.getClass().getName()); - } catch (Exception e) { - throw new RuntimeCamelException("Error during post processing of bean: " + answer, e); - } + applyBeanPostProcessing(answer); } return answer; } @@ -92,4 +87,13 @@ public T newInstance(Class type, boolean postProcessBean) { public boolean supportsAutoWiring() { return false; } + + protected void applyBeanPostProcessing(T bean) { + try { + postProcessor.postProcessBeforeInitialization(bean, bean.getClass().getName()); + postProcessor.postProcessAfterInitialization(bean, bean.getClass().getName()); + } catch (Exception e) { + throw new RuntimeCamelException("Error during post processing of bean: " + bean, e); + } + } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index b0f2806917bad..596565f465105 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -136,37 +136,37 @@ void schedule(Runnable runnable, boolean first, boolean main, boolean sync) { LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, main, sync, runnable); } if (main) { - if (!queue.isEmpty()) { - if (back == null) { - back = new ArrayDeque<>(); - } - back.push(queue); - queue = new ArrayDeque<>(); - } + executeMainFlow(); } if (first) { queue.addFirst(runnable); - if (stats) { - executor.pendingTasks.increment(); - } } else { queue.addLast(runnable); - if (stats) { - executor.pendingTasks.increment(); + } + + incrementPendingTasks(); + tryExecuteReactiveWork(runnable, sync); + } + + private void executeMainFlow() { + if (!queue.isEmpty()) { + if (back == null) { + back = new ArrayDeque<>(); } + back.push(queue); + queue = new ArrayDeque<>(); } + } + + private void tryExecuteReactiveWork(Runnable runnable, boolean sync) { if (!running || sync) { running = true; - if (stats) { - executor.runningWorkers.increment(); - } + incrementRunningWorkers(); try { executeReactiveWork(); } finally { running = false; - if (stats) { - executor.runningWorkers.decrement(); - } + decrementRunningWorkers(); } } else { if (LOG.isTraceEnabled()) { @@ -186,18 +186,44 @@ private void executeReactiveWork() { break; } } - try { - if (stats) { - executor.pendingTasks.decrement(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Worker #{} running: {}", number, polled); - } - polled.run(); - } catch (Exception t) { - LOG.warn("Error executing reactive work due to {}. This exception is ignored.", - t.getMessage(), t); + doRun(polled); + } + } + + private void doRun(Runnable polled) { + try { + decrementPendingTasks(); + if (LOG.isTraceEnabled()) { + LOG.trace("Worker #{} running: {}", number, polled); } + polled.run(); + } catch (Exception t) { + LOG.warn("Error executing reactive work due to {}. This exception is ignored.", + t.getMessage(), t); + } + } + + private void decrementRunningWorkers() { + if (stats) { + executor.runningWorkers.decrement(); + } + } + + private void incrementRunningWorkers() { + if (stats) { + executor.runningWorkers.increment(); + } + } + + private void incrementPendingTasks() { + if (stats) { + executor.pendingTasks.increment(); + } + } + + private void decrementPendingTasks() { + if (stats) { + executor.pendingTasks.decrement(); } } @@ -207,9 +233,7 @@ boolean executeFromQueue() { return false; } try { - if (stats) { - executor.pendingTasks.decrement(); - } + decrementPendingTasks(); if (LOG.isTraceEnabled()) { LOG.trace("Running: {}", polled); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java index ff5beac8897e9..b6c4123eca8a5 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultShutdownStrategy.java @@ -199,15 +199,11 @@ protected boolean doShutdown( routesOrdered.sort(comparator); if (logger.shouldLog()) { - if (suspendOnly) { - String msg = String.format("Starting to graceful suspend %s routes (timeout %s %s)", routesOrdered.size(), - timeout, timeUnit.toString().toLowerCase(Locale.ENGLISH)); - logger.log(msg); - } else { - String msg = String.format("Starting to graceful shutdown %s routes (timeout %s %s)", routesOrdered.size(), - timeout, timeUnit.toString().toLowerCase(Locale.ENGLISH)); - logger.log(msg); - } + final String action = suspendOnly ? "suspend" : "shutdown"; + + String msg = String.format("Starting to graceful %s %s routes (timeout %s %s)", action, routesOrdered.size(), + timeout, timeUnit.toString().toLowerCase(Locale.ENGLISH)); + logger.log(msg); } // use another thread to perform the shutdowns so we can support timeout diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java index f7513cb65f240..4c0919f7b7e83 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java @@ -260,21 +260,7 @@ public StreamCache cache(Message message) { if (body != null) { boolean allowed = allowClasses == null && denyClasses == null; if (!allowed) { - Class source = body.getClass(); - if (denyClasses != null && allowClasses != null) { - // deny takes precedence - allowed = !isAssignableFrom(source, denyClasses); - if (allowed) { - allowed = isAssignableFrom(source, allowClasses); - } - } else if (denyClasses != null) { - allowed = !isAssignableFrom(source, denyClasses); - } else { - allowed = isAssignableFrom(source, allowClasses); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Cache stream from class: {} is {}", source, allowed ? "allowed" : "denied"); - } + allowed = checkAllowDenyList(body); } if (allowed) { cache = camelContext.getTypeConverter().convertTo(StreamCache.class, message.getExchange(), body); @@ -285,20 +271,44 @@ public StreamCache cache(Message message) { LOG.trace("Cached stream to {} -> {}", cache.inMemory() ? "memory" : "spool", cache); } if (statistics.isStatisticsEnabled()) { - try { - if (cache.inMemory()) { - statistics.updateMemory(cache.length()); - } else { - statistics.updateSpool(cache.length()); - } - } catch (Exception e) { - LOG.debug("Error updating cache statistics. This exception is ignored.", e); - } + computeStatistics(cache); } } return cache; } + private boolean checkAllowDenyList(Object body) { + boolean allowed; + Class source = body.getClass(); + if (denyClasses != null && allowClasses != null) { + // deny takes precedence + allowed = !isAssignableFrom(source, denyClasses); + if (allowed) { + allowed = isAssignableFrom(source, allowClasses); + } + } else if (denyClasses != null) { + allowed = !isAssignableFrom(source, denyClasses); + } else { + allowed = isAssignableFrom(source, allowClasses); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Cache stream from class: {} is {}", source, allowed ? "allowed" : "denied"); + } + return allowed; + } + + private void computeStatistics(StreamCache cache) { + try { + if (cache.inMemory()) { + statistics.updateMemory(cache.length()); + } else { + statistics.updateSpool(cache.length()); + } + } catch (Exception e) { + LOG.debug("Error updating cache statistics. This exception is ignored.", e); + } + } + protected static boolean isAssignableFrom(Class source, Collection> targets) { for (Class target : targets) { if (target.isAssignableFrom(source)) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java index 9ec0a6fff4d73..2a8e9839f3d53 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java @@ -50,17 +50,13 @@ *

* This code has been refactored out of {@link AbstractCamelContext} to its own class. */ -class InternalRouteStartupManager { +final class InternalRouteStartupManager { private static final Logger LOG = LoggerFactory.getLogger(InternalRouteStartupManager.class); private final ThreadLocal setupRoute = new ThreadLocal<>(); - private final AbstractCamelContext abstractCamelContext; private final CamelLogger routeLogger = new CamelLogger(LOG); - - public InternalRouteStartupManager(AbstractCamelContext abstractCamelContext) { - this.abstractCamelContext = abstractCamelContext; - } + private int defaultRouteStartupOrder = 1000; /** * If Camel is currently starting up a route then this returns the route. @@ -75,13 +71,13 @@ public Route getSetupRoute() { * @param routeServices the routes to initialize * @throws Exception is thrown if error initializing routes */ - protected void doInitRoutes(Map routeServices) + public void doInitRoutes(AbstractCamelContext camelContext, Map routeServices) throws Exception { - abstractCamelContext.setStartingRoutes(true); + camelContext.setStartingRoutes(true); try { for (RouteService routeService : routeServices.values()) { - StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(), + StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(), "Init Route"); try { LOG.debug("Initializing route id: {}", routeService.getId()); @@ -92,11 +88,11 @@ protected void doInitRoutes(Map routeServices) routeService.setUp(); } finally { setupRoute.remove(); - abstractCamelContext.getStartupStepRecorder().endStep(step); + camelContext.getStartupStepRecorder().endStep(step); } } } finally { - abstractCamelContext.setStartingRoutes(false); + camelContext.setStartingRoutes(false); } } @@ -112,10 +108,11 @@ protected void doInitRoutes(Map routeServices) * @throws Exception is thrown if error starting routes */ protected void doStartOrResumeRoutes( + AbstractCamelContext camelContext, Map routeServices, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes) throws Exception { - abstractCamelContext.setStartingRoutes(true); + camelContext.setStartingRoutes(true); try { // filter out already started routes Map filtered = new LinkedHashMap<>(); @@ -144,10 +141,10 @@ protected void doStartOrResumeRoutes( } // the context is in last phase of staring, so lets start the routes - safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values()); + safelyStartRouteServices(camelContext, checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values()); } finally { - abstractCamelContext.setStartingRoutes(false); + camelContext.setStartingRoutes(false); } } @@ -166,6 +163,7 @@ protected void doStartOrResumeRoutes( * @throws Exception is thrown if error starting the routes */ protected synchronized void safelyStartRouteServices( + AbstractCamelContext camelContext, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes, Collection routeServices) throws Exception { @@ -177,77 +175,79 @@ protected synchronized void safelyStartRouteServices( // figure out the order in which the routes should be started for (RouteService routeService : routeServices) { - DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService); + DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(camelContext, routeService); // check for clash before we add it as input if (checkClash) { - doCheckStartupOrderClash(order, inputs); + doCheckStartupOrderClash(camelContext, order, inputs); } inputs.put(order.getStartupOrder(), order); } // warm up routes before we start them - doWarmUpRoutes(inputs, startConsumer); + doWarmUpRoutes(camelContext, inputs, startConsumer); // sort the startup listeners so they are started in the right order - abstractCamelContext.getStartupListeners().sort(OrderedComparator.get()); + camelContext.getStartupListeners().sort(OrderedComparator.get()); // now call the startup listeners where the routes has been warmed up // (only the actual route consumer has not yet been started) - for (StartupListener startup : abstractCamelContext.getStartupListeners()) { - startup.onCamelContextStarted(abstractCamelContext.getCamelContextReference(), abstractCamelContext.isStarted()); + for (StartupListener startup : camelContext.getStartupListeners()) { + startup.onCamelContextStarted(camelContext.getCamelContextReference(), camelContext.isStarted()); } // because the consumers may also register startup listeners we need to // reset // the already started listeners - List backup = new ArrayList<>(abstractCamelContext.getStartupListeners()); - abstractCamelContext.getStartupListeners().clear(); + List backup = new ArrayList<>(camelContext.getStartupListeners()); + camelContext.getStartupListeners().clear(); // now start the consumers if (startConsumer) { if (resumeConsumer) { // and now resume the routes - doResumeRouteConsumers(inputs, addingRoutes); + doResumeRouteConsumers(camelContext, inputs, addingRoutes); } else { // and now start the routes // and check for clash with multiple consumers of the same // endpoints which is not allowed - doStartRouteConsumers(inputs, addingRoutes); + doStartRouteConsumers(camelContext, inputs, addingRoutes); } } // sort the startup listeners so they are started in the right order - abstractCamelContext.getStartupListeners().sort(OrderedComparator.get()); + camelContext.getStartupListeners().sort(OrderedComparator.get()); // now the consumers that was just started may also add new // StartupListeners (such as timer) // so we need to ensure they get started as well - for (StartupListener startup : abstractCamelContext.getStartupListeners()) { - startup.onCamelContextStarted(abstractCamelContext.getCamelContextReference(), abstractCamelContext.isStarted()); + for (StartupListener startup : camelContext.getStartupListeners()) { + startup.onCamelContextStarted(camelContext.getCamelContextReference(), camelContext.isStarted()); } // and add the previous started startup listeners to the list so we have // them all - abstractCamelContext.getStartupListeners().addAll(0, backup); + camelContext.getStartupListeners().addAll(0, backup); // inputs no longer needed inputs.clear(); } /** - * @see #safelyStartRouteServices(boolean, boolean, boolean, boolean, Collection) + * @see #safelyStartRouteServices(AbstractCamelContext, boolean, boolean, boolean, boolean, Collection) */ - protected synchronized void safelyStartRouteServices( + public synchronized void safelyStartRouteServices( + AbstractCamelContext camelContext, boolean forceAutoStart, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes, RouteService... routeServices) throws Exception { - safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, Arrays.asList(routeServices)); + safelyStartRouteServices(camelContext, checkClash, startConsumer, resumeConsumer, addingRoutes, + Arrays.asList(routeServices)); } - DefaultRouteStartupOrder doPrepareRouteToBeStarted(RouteService routeService) { + DefaultRouteStartupOrder doPrepareRouteToBeStarted(AbstractCamelContext camelContext, RouteService routeService) { // add the inputs from this route service to the list to start // afterwards // should be ordered according to the startup number Integer startupOrder = routeService.getRoute().getStartupOrder(); if (startupOrder == null) { // auto assign a default startup order - startupOrder = abstractCamelContext.defaultRouteStartupOrder++; + startupOrder = defaultRouteStartupOrder++; } // create holder object that contains information about this route to be @@ -256,7 +256,8 @@ DefaultRouteStartupOrder doPrepareRouteToBeStarted(RouteService routeService) { return new DefaultRouteStartupOrder(startupOrder, route, routeService); } - boolean doCheckStartupOrderClash(DefaultRouteStartupOrder answer, Map inputs) + boolean doCheckStartupOrderClash( + AbstractCamelContext camelContext, DefaultRouteStartupOrder answer, Map inputs) throws FailedToStartRouteException { // check for clash by startupOrder id DefaultRouteStartupOrder other = inputs.get(answer.getStartupOrder()); @@ -267,7 +268,7 @@ boolean doCheckStartupOrderClash(DefaultRouteStartupOrder answer, Map inputs, boolean autoStartup) throws FailedToStartRouteException { + void doWarmUpRoutes(AbstractCamelContext camelContext, Map inputs, boolean autoStartup) + throws FailedToStartRouteException { // now prepare the routes by starting its services before we start the // input for (Map.Entry entry : inputs.entrySet()) { @@ -296,7 +298,7 @@ void doWarmUpRoutes(Map inputs, boolean autoS // will then be prepared in time before we start inputs which will // consume messages to be routed RouteService routeService = entry.getValue().getRouteService(); - StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(), + StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(), "Warump Route"); try { LOG.debug("Warming up route id: {} having autoStartup={}", routeService.getId(), autoStartup); @@ -306,24 +308,29 @@ void doWarmUpRoutes(Map inputs, boolean autoS routeService.warmUp(); } finally { setupRoute.remove(); - abstractCamelContext.getStartupStepRecorder().endStep(step); + camelContext.getStartupStepRecorder().endStep(step); } } } - void doResumeRouteConsumers(Map inputs, boolean addingRoutes) throws Exception { - doStartOrResumeRouteConsumers(inputs, true, addingRoutes); + void doResumeRouteConsumers( + AbstractCamelContext camelContext, Map inputs, boolean addingRoutes) + throws Exception { + doStartOrResumeRouteConsumers(camelContext, inputs, true, addingRoutes); } - void doStartRouteConsumers(Map inputs, boolean addingRoutes) throws Exception { - doStartOrResumeRouteConsumers(inputs, false, addingRoutes); + void doStartRouteConsumers( + AbstractCamelContext camelContext, Map inputs, boolean addingRoutes) + throws Exception { + doStartOrResumeRouteConsumers(camelContext, inputs, false, addingRoutes); } - private LoggingLevel getRouteLoggerLogLevel() { - return abstractCamelContext.getRouteController().getLoggingLevel(); + private LoggingLevel getRouteLoggerLogLevel(AbstractCamelContext camelContext) { + return camelContext.getRouteController().getLoggingLevel(); } private void doStartOrResumeRouteConsumers( + AbstractCamelContext camelContext, Map inputs, boolean resumeOnly, boolean addingRoute) throws Exception { List routeInputs = new ArrayList<>(); @@ -339,11 +346,11 @@ private void doStartOrResumeRouteConsumers( if (addingRoute && !autoStartup) { routeLogger.log( "Skipping starting of route " + routeService.getId() + " as it's configured with autoStartup=false", - getRouteLoggerLogLevel()); + getRouteLoggerLogLevel(camelContext)); continue; } - StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, route.getRouteId(), + StartupStep step = camelContext.getStartupStepRecorder().beginStep(Route.class, route.getRouteId(), "Start Route"); // do some preparation before starting the consumer on the route @@ -360,13 +367,13 @@ private void doStartOrResumeRouteConsumers( // check for multiple consumer violations with existing routes // which have already been started, or is currently starting List existingEndpoints = new ArrayList<>(); - for (Route existingRoute : abstractCamelContext.getRoutes()) { + for (Route existingRoute : camelContext.getRoutes()) { if (route.getId().equals(existingRoute.getId())) { // skip ourselves continue; } Endpoint existing = existingRoute.getEndpoint(); - ServiceStatus status = abstractCamelContext.getRouteStatus(existingRoute.getId()); + ServiceStatus status = camelContext.getRouteStatus(existingRoute.getId()); if (status != null && (status.isStarted() || status.isStarting())) { existingEndpoints.add(existing); } @@ -391,14 +398,14 @@ private void doStartOrResumeRouteConsumers( String uri = endpoint.getEndpointBaseUri(); uri = URISupport.sanitizeUri(uri); routeLogger.log("Route: " + route.getId() + " resumed and consuming from: " + uri, - getRouteLoggerLogLevel()); + getRouteLoggerLogLevel(camelContext)); } else { // when starting we should invoke the lifecycle strategies - for (LifecycleStrategy strategy : abstractCamelContext.getLifecycleStrategies()) { - strategy.onServiceAdd(abstractCamelContext.getCamelContextReference(), consumer, route); + for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { + strategy.onServiceAdd(camelContext.getCamelContextReference(), consumer, route); } try { - abstractCamelContext.startService(consumer); + camelContext.startService(consumer); route.getProperties().remove("route.start.exception"); } catch (Exception e) { route.getProperties().put("route.start.exception", e); @@ -409,7 +416,7 @@ private void doStartOrResumeRouteConsumers( String uri = endpoint.getEndpointBaseUri(); uri = URISupport.sanitizeUri(uri); routeLogger.log("Route: " + route.getId() + " started and consuming from: " + uri, - getRouteLoggerLogLevel()); + getRouteLoggerLogLevel(camelContext)); } routeInputs.add(endpoint); @@ -419,14 +426,14 @@ private void doStartOrResumeRouteConsumers( // but only add if we haven't already registered it before (we // dont want to double add when restarting) boolean found = false; - for (RouteStartupOrder other : abstractCamelContext.getCamelContextExtension().getRouteStartupOrder()) { + for (RouteStartupOrder other : camelContext.getCamelContextExtension().getRouteStartupOrder()) { if (other.getRoute().getId().equals(route.getId())) { found = true; break; } } if (!found) { - abstractCamelContext.getCamelContextExtension().getRouteStartupOrder().add(entry.getValue()); + camelContext.getCamelContextExtension().getRouteStartupOrder().add(entry.getValue()); } } @@ -444,7 +451,7 @@ private void doStartOrResumeRouteConsumers( } } - abstractCamelContext.getStartupStepRecorder().endStep(step); + camelContext.getStartupStepRecorder().endStep(step); } } @@ -468,4 +475,8 @@ private boolean doCheckMultipleConsumerSupportClash(Endpoint endpoint, List services = new CopyOnWriteArrayList<>(); - InternalServiceManager(CamelContext camelContext, InternalRouteStartupManager internalRouteStartupManager, - List startupListeners) { + InternalServiceManager(InternalRouteStartupManager internalRouteStartupManager, List startupListeners) { /* Note: this is an internal API and not meant to be public, so it uses assertion for lightweight nullability checking for extremely unlikely scenarios that should be found during development time. */ - assert camelContext != null : "the Camel context cannot be null"; assert internalRouteStartupManager != null : "the internalRouteStartupManager cannot be null"; assert startupListeners != null : "the startupListeners cannot be null"; - this.camelContext = camelContext; this.internalRouteStartupManager = internalRouteStartupManager; startupListeners.add(deferStartupListener); } - public T addService(T object) { - return addService(object, true); + public T addService(CamelContext camelContext, T object) { + return addService(camelContext, object, true); } - public T addService(T object, boolean stopOnShutdown) { - return addService(object, stopOnShutdown, true, true); + public T addService(CamelContext camelContext, T object, boolean stopOnShutdown) { + return addService(camelContext, object, stopOnShutdown, true, true); } - public T addService(T object, boolean stopOnShutdown, boolean forceStart, boolean useLifecycleStrategies) { + public T addService( + CamelContext camelContext, T object, boolean stopOnShutdown, boolean forceStart, boolean useLifecycleStrategies) { try { - doAddService(object, stopOnShutdown, forceStart, useLifecycleStrategies); + doAddService(camelContext, object, stopOnShutdown, forceStart, useLifecycleStrategies); } catch (Exception e) { throw RuntimeCamelException.wrapRuntimeCamelException(e); } return object; } - public void doAddService(Object object, boolean stopOnShutdown, boolean forceStart, boolean useLifecycleStrategies) + public void doAddService( + CamelContext camelContext, Object object, boolean stopOnShutdown, boolean forceStart, + boolean useLifecycleStrategies) throws Exception { if (object == null) { @@ -147,14 +146,14 @@ public void doAddService(Object object, boolean stopOnShutdown, boolean forceSta ServiceHelper.startService(service); } else { ServiceHelper.initService(service); - deferStartService(object, stopOnShutdown, true); + deferStartService(camelContext, object, stopOnShutdown, true); } } } } } - public void deferStartService(Object object, boolean stopOnShutdown, boolean startEarly) { + public void deferStartService(CamelContext camelContext, Object object, boolean stopOnShutdown, boolean startEarly) { if (object instanceof Service) { Service service = (Service) object; @@ -223,7 +222,7 @@ public T hasService(Class type) { return null; } - public void stopConsumers() { + public void stopConsumers(CamelContext camelContext) { for (Service service : services) { if (service instanceof Consumer) { InternalServiceManager.shutdownServices(camelContext, service); @@ -231,7 +230,7 @@ public void stopConsumers() { } } - public void shutdownServices() { + public void shutdownServices(CamelContext camelContext) { InternalServiceManager.shutdownServices(camelContext, services); services.clear(); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java index f8b3de187e9d7..d64b2c174cb46 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java @@ -160,9 +160,7 @@ public boolean process( states[j++] = state; } } catch (Exception e) { - exchange.setException(e); - originalCallback.done(true); - return true; + return handleException(exchange, originalCallback, e); } } @@ -170,63 +168,77 @@ public boolean process( AsyncCallback callback = new InternalCallback(states, exchange, originalCallback, resultProcessor); if (exchange.isTransacted()) { - // must be synchronized for transacted exchanges - if (LOG.isTraceEnabled()) { - if (exchange.isTransacted()) { - LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", - exchange.getExchangeId(), exchange); - } else { - LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", - exchange.getExchangeId(), exchange); - } - } - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ START +++ - // ---------------------------------------------------------- - try { - processor.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ END +++ - // ---------------------------------------------------------- - callback.done(true); - return true; + return processTransacted(exchange, processor, callback); } else { - final UnitOfWork uow = exchange.getUnitOfWork(); + return processNonTransacted(exchange, processor, callback); + } + } - // do uow before processing and if a value is returned then the uow wants to be processed after in the same thread - AsyncCallback async = callback; - boolean beforeAndAfter = uow.isBeforeAfterProcess(); - if (beforeAndAfter) { - async = uow.beforeProcess(processor, exchange, async); - } + private static boolean handleException(Exchange exchange, AsyncCallback originalCallback, Exception e) { + exchange.setException(e); + originalCallback.done(true); + return true; + } - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ START +++ - // ---------------------------------------------------------- - if (LOG.isTraceEnabled()) { - LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); - } - boolean sync = processor.process(exchange, async); - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ END +++ - // ---------------------------------------------------------- + private static boolean processNonTransacted(Exchange exchange, AsyncProcessor processor, AsyncCallback callback) { + final UnitOfWork uow = exchange.getUnitOfWork(); - // optimize to only do after uow processing if really needed - if (beforeAndAfter) { - // execute any after processor work (in current thread, not in the callback) - uow.afterProcess(processor, exchange, callback, sync); - } + // do uow before processing and if a value is returned then the uow wants to be processed after in the same thread + AsyncCallback async = callback; + boolean beforeAndAfter = uow.isBeforeAfterProcess(); + if (beforeAndAfter) { + async = uow.beforeProcess(processor, exchange, async); + } + + // ---------------------------------------------------------- + // CAMEL END USER - DEBUG ME HERE +++ START +++ + // ---------------------------------------------------------- + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); + } + boolean sync = processor.process(exchange, async); + // ---------------------------------------------------------- + // CAMEL END USER - DEBUG ME HERE +++ END +++ + // ---------------------------------------------------------- + + // optimize to only do after uow processing if really needed + if (beforeAndAfter) { + // execute any after processor work (in current thread, not in the callback) + uow.afterProcess(processor, exchange, callback, sync); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", + sync ? "synchronously" : "asynchronously", + exchange.getExchangeId(), exchange); + } + return sync; + } - if (LOG.isTraceEnabled()) { - LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", - sync ? "synchronously" : "asynchronously", + private static boolean processTransacted(Exchange exchange, AsyncProcessor processor, AsyncCallback callback) { + // must be synchronized for transacted exchanges + if (LOG.isTraceEnabled()) { + if (exchange.isTransacted()) { + LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", + exchange.getExchangeId(), exchange); + } else { + LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); } - return sync; } + // ---------------------------------------------------------- + // CAMEL END USER - DEBUG ME HERE +++ START +++ + // ---------------------------------------------------------- + try { + processor.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + // ---------------------------------------------------------- + // CAMEL END USER - DEBUG ME HERE +++ END +++ + // ---------------------------------------------------------- + callback.done(true); + return true; } /** diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java index 63fba73c03560..d9ba7c9e0b296 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java @@ -45,12 +45,7 @@ public static StreamCache convertToStreamCache(StreamCachingStrategy strategy, E return sc; } } catch (Exception e) { - // lets allow Camels error handler to deal with stream cache failures - StreamCacheException tce = new StreamCacheException(null, e); - exchange.setException(tce); - // because this is stream caching error then we cannot use redelivery as the message body is corrupt - // so mark as redelivery exhausted - exchange.getExchangeExtension().setRedeliveryExhausted(true); + handleException(exchange, null, e); } // check if we somewhere failed due to a stream caching exception Throwable cause = exchange.getException(); @@ -76,15 +71,23 @@ private static StreamCache tryStreamCache( } return sc; } catch (Exception e) { - // lets allow Camels error handler to deal with stream cache failures - StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e); - exchange.setException(tce); - // because this is stream caching error then we cannot use redelivery as the message body is corrupt - // so mark as redelivery exhausted - exchange.getExchangeExtension().setRedeliveryExhausted(true); + handleException(exchange, e); } } return null; } + private static void handleException(Exchange exchange, Exception e) { + handleException(exchange, exchange.getMessage().getBody(), e); + } + + private static void handleException(Exchange exchange, Object value, Exception e) { + // lets allow Camels error handler to deal with stream cache failures + StreamCacheException tce = new StreamCacheException(value, e); + exchange.setException(tce); + // because this is stream caching error then we cannot use redelivery as the message body is corrupt + // so mark as redelivery exhausted + exchange.getExchangeExtension().setRedeliveryExhausted(true); + } + } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/AdviceWith.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/AdviceWith.java index 63ddec470548a..4b210df3f82d2 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/builder/AdviceWith.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/AdviceWith.java @@ -239,9 +239,9 @@ private static RouteDefinition doAdviceWith(RouteDefinition definition, CamelCon } String beforeAsXml = null; - final ModelToXMLDumper modelToXMLDumper = PluginHelper.getModelToXMLDumper(ecc); if (logRoutesAsXml && LOG.isInfoEnabled()) { try { + ModelToXMLDumper modelToXMLDumper = PluginHelper.getModelToXMLDumper(ecc); beforeAsXml = modelToXMLDumper.dumpModelAsXml(camelContext, definition); } catch (Exception e) { // ignore, it may be due jaxb is not on classpath etc @@ -278,6 +278,7 @@ private static RouteDefinition doAdviceWith(RouteDefinition definition, CamelCon if (beforeAsXml != null && logRoutesAsXml && LOG.isInfoEnabled()) { try { + ModelToXMLDumper modelToXMLDumper = PluginHelper.getModelToXMLDumper(ecc); String afterAsXml = modelToXMLDumper.dumpModelAsXml(camelContext, merged); LOG.info("Adviced route before/after as XML:\n{}\n\n{}", beforeAsXml, afterAsXml); } catch (Exception e) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 2a41f1219014a..f85d98e9481c9 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -420,55 +420,28 @@ public void done(boolean doneSync) { @Override public void run() { // can we still run - boolean run = true; - if (shutdownStrategy.isForceShutdown()) { - run = false; - } - if (run && isStoppingOrStopped()) { - run = false; - } - if (!run) { - LOG.trace("Run not allowed, will reject executing exchange: {}", exchange); - if (exchange.getException() == null) { - exchange.setException(new RejectedExecutionException()); - } - AsyncCallback cb = callback; - taskFactory.release(this); - cb.done(false); + if (shutdownStrategy.isForceShutdown() || isStoppingOrStopped()) { + runNotAllowed(); return; } + if (exchange.getExchangeExtension().isInterrupted()) { - // mark the exchange to stop continue routing when interrupted - // as we do not want to continue routing (for example a task has been cancelled) - if (LOG.isTraceEnabled()) { - LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId()); - } - exchange.setRouteStop(true); - // we should not continue routing so call callback - AsyncCallback cb = callback; - taskFactory.release(this); - cb.done(false); + runInterrupted(); return; } // only new failure if the exchange has exception // and it has not been handled by the failure processor before // or not exhausted - boolean failure = exchange.getException() != null + final boolean failure = exchange.getException() != null && !exchange.getExchangeExtension().isRedeliveryExhausted() && !ExchangeHelper.isFailureHandled(exchange); // error handled bridged - boolean bridge = ExchangeHelper.isErrorHandlerBridge(exchange); + final boolean bridge = ExchangeHelper.isErrorHandlerBridge(exchange); if (failure || bridge) { // previous processing cause an exception - handleException(); - onExceptionOccurred(); - prepareExchangeAfterFailure(exchange); - // we do not support redelivery so continue callback - AsyncCallback cb = callback; - taskFactory.release(this); - reactiveExecutor.schedule(cb); + handlePreviousFailure(); } else if (first) { // first time call the target processor first = false; @@ -481,6 +454,39 @@ public void run() { } } + private void handlePreviousFailure() { + handleException(); + onExceptionOccurred(); + prepareExchangeAfterFailure(exchange); + // we do not support redelivery so continue callback + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); + } + + private void runInterrupted() { + // mark the exchange to stop continue routing when interrupted + // as we do not want to continue routing (for example a task has been cancelled) + if (LOG.isTraceEnabled()) { + LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId()); + } + exchange.setRouteStop(true); + // we should not continue routing so call callback + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); + } + + private void runNotAllowed() { + LOG.trace("Run not allowed, will reject executing exchange: {}", exchange); + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException()); + } + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); + } + protected void handleException() { Exception e = exchange.getException(); // e is never null diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java index 6daf41f2ac1c2..d9be8fb14dd92 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java @@ -22,6 +22,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -49,9 +50,9 @@ public void testSuspendResume() throws Exception { context.suspend(); // need to give seda consumer thread time to idle - Awaitility.await().atMost(100, TimeUnit.MILLISECONDS) - .pollDelay(10, TimeUnit.MILLISECONDS) - .untilAsserted(() -> template.sendBody("seda:foo", "B")); + Awaitility.await().atMost(200, TimeUnit.MILLISECONDS) + .pollDelay(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertDoesNotThrow(() -> template.sendBody("seda:foo", "B"))); mock.assertIsSatisfied(1000); diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java index 17fd88f0508ab..1da8330822cf8 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java @@ -33,19 +33,39 @@ * maximum number of messages has been processed. */ public class MainDurationEventNotifier extends EventNotifierSupport { - private static final Logger LOG = LoggerFactory.getLogger(MainLifecycleStrategy.class); + + private enum Action { + SHUTDOWN, + STOP; + + static Action toAction(String action) { + if ("shutdown".equals(action)) { + return SHUTDOWN; + } + + if ("stop".equals(action)) { + return STOP; + } + + LOG.warn("Invalid action: {}. Main execution will be aborted during initialization", action); + return null; + } + } + + private final CamelContext camelContext; private final int maxMessages; private final long maxIdleSeconds; private final MainShutdownStrategy shutdownStrategy; private final boolean stopCamelContext; private final boolean restartDuration; - private final String action; + private final Action action; private final LongAdder doneMessages; private volatile StopWatch watch; private volatile ScheduledExecutorService idleExecutorService; + public MainDurationEventNotifier(CamelContext camelContext, int maxMessages, long maxIdleSeconds, MainShutdownStrategy shutdownStrategy, boolean stopCamelContext, boolean restartDuration, String action) { @@ -55,7 +75,7 @@ public MainDurationEventNotifier(CamelContext camelContext, int maxMessages, lon this.shutdownStrategy = shutdownStrategy; this.stopCamelContext = stopCamelContext; this.restartDuration = restartDuration; - this.action = action.toLowerCase(); + this.action = Action.toAction(action); this.doneMessages = new LongAdder(); if (maxMessages == 0 && maxIdleSeconds == 0) { @@ -79,17 +99,8 @@ protected void doNotify(CamelEvent event) { return; } - final boolean reloaded = event.getType() == CamelEvent.Type.RouteReloaded; - - if (reloaded) { - if (restartDuration) { - LOG.debug("Routes reloaded. Resetting maxMessages/maxIdleSeconds/maxSeconds"); - shutdownStrategy.restartAwait(); - doneMessages.reset(); - if (watch != null) { - watch.restart(); - } - } + if (event.getType() == CamelEvent.Type.RouteReloaded) { + resetOnReload(); return; } @@ -108,30 +119,49 @@ protected void doNotify(CamelEvent event) { } if (result && shutdownStrategy.isRunAllowed()) { - if ("shutdown".equalsIgnoreCase(action)) { - LOG.info("Duration max messages triggering shutdown of the JVM"); - // use thread to shut down Camel as otherwise we would block current thread - camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask) - .start(); - } else if ("stop".equalsIgnoreCase(action)) { - LOG.info("Duration max messages triggering stopping all routes"); - // use thread to stop routes as otherwise we would block current thread - camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::stopTask) - .start(); - } + triggerDoneEvent(); } } } // idle reacts on both incoming and complete messages if (maxIdleSeconds > 0) { - final boolean begin = event.getType() == CamelEvent.Type.ExchangeCreated; + resetOnActivity(event, complete); + } + } - if (begin || complete) { - if (watch != null) { - LOG.trace("Message activity so restarting stop watch"); - watch.restart(); - } + private void triggerDoneEvent() { + if (action == Action.SHUTDOWN) { + LOG.info("Duration max messages triggering shutdown of the JVM"); + // use thread to shut down Camel as otherwise we would block current thread + camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask) + .start(); + } else if (action == Action.STOP) { + LOG.info("Duration max messages triggering stopping all routes"); + // use thread to stop routes as otherwise we would block current thread + camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::stopTask) + .start(); + } + } + + private void resetOnActivity(CamelEvent event, boolean complete) { + final boolean created = event.getType() == CamelEvent.Type.ExchangeCreated; + + if (created || complete) { + if (watch != null) { + LOG.trace("Message activity so restarting stop watch"); + watch.restart(); + } + } + } + + private void resetOnReload() { + if (restartDuration) { + LOG.debug("Routes reloaded. Resetting maxMessages/maxIdleSeconds/maxSeconds"); + shutdownStrategy.restartAwait(); + doneMessages.reset(); + if (watch != null) { + watch.restart(); } } } @@ -151,7 +181,7 @@ public String toString() { protected void doInit() throws Exception { super.doInit(); - if (!action.equals("shutdown") && !action.equals("stop")) { + if (action == null) { throw new IllegalArgumentException("Unknown action: " + action); } } @@ -228,15 +258,19 @@ private void idleTask() { LOG.trace("Duration max idle check {} >= {} -> {}", seconds, maxIdleSeconds, result); if (result && shutdownStrategy.isRunAllowed()) { - if ("shutdown".equals(action)) { - LOG.info("Duration max idle triggering shutdown of the JVM"); - // use thread to stop Camel as otherwise we would block current thread - camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask).start(); - } else if ("stop".equals(action)) { - LOG.info("Duration max idle triggering stopping all routes"); - // use thread to stop Camel as otherwise we would block current thread - camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::stopTask).start(); - } + triggerIdleEvent(); + } + } + + private void triggerIdleEvent() { + if (action == Action.SHUTDOWN) { + LOG.info("Duration max idle triggering shutdown of the JVM"); + // use thread to stop Camel as otherwise we would block current thread + camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask).start(); + } else if (action == Action.STOP) { + LOG.info("Duration max idle triggering stopping all routes"); + // use thread to stop Camel as otherwise we would block current thread + camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::stopTask).start(); } } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 32cd22eec71e2..0a2da86ec14bc 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -34,6 +34,7 @@ import org.apache.camel.CamelExecutionException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExchangeExtension; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Message; @@ -343,58 +344,74 @@ public static void copyResultsPreservePattern(Exchange target, Exchange source) private static void doCopyResults(Exchange result, Exchange source, boolean preserverPattern) { if (result == source) { - // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) - // and the result is not failed - if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { - // copy IN to OUT as we expect a OUT response - result.getOut().copyFrom(source.getIn()); - } + copyFromOutMessageConditionally(result, source); return; } if (source.hasOut()) { - if (preserverPattern) { - // exchange pattern sensitive - Message resultMessage = getResultMessage(result); - resultMessage.copyFrom(source.getOut()); - } else { - result.getOut().copyFrom(source.getOut()); - } + copyFromOutMessage(result, source, preserverPattern); } else { - // no results so lets copy the last input - // as the final processor on a pipeline might not - // have created any OUT; such as a mock:endpoint - // so lets assume the last IN is the OUT - if (!preserverPattern && result.getPattern().isOutCapable()) { - // only set OUT if its OUT capable - result.getOut().copyFrom(source.getIn()); - } else { - // if not replace IN instead to keep the MEP - result.getIn().copyFrom(source.getIn()); - // clear any existing OUT as the result is on the IN - if (result.hasOut()) { - result.setOut(null); - } - } + copyFromInMessage(result, source, preserverPattern); } if (source.hasProperties()) { result.getProperties().putAll(source.getProperties()); } - source.getExchangeExtension().copyInternalProperties(result); - source.getExchangeExtension().copySafeCopyPropertiesTo(result.getExchangeExtension()); + + final ExchangeExtension sourceExtension = source.getExchangeExtension(); + sourceExtension.copyInternalProperties(result); + + final ExchangeExtension resultExtension = result.getExchangeExtension(); + sourceExtension.copySafeCopyPropertiesTo(resultExtension); // copy over state result.setRouteStop(source.isRouteStop()); result.setRollbackOnly(source.isRollbackOnly()); result.setRollbackOnlyLast(source.isRollbackOnlyLast()); - result.getExchangeExtension().setNotifyEvent(source.getExchangeExtension().isNotifyEvent()); - result.getExchangeExtension().setRedeliveryExhausted(source.getExchangeExtension().isRedeliveryExhausted()); - result.getExchangeExtension().setErrorHandlerHandled(source.getExchangeExtension().getErrorHandlerHandled()); + resultExtension.setNotifyEvent(sourceExtension.isNotifyEvent()); + resultExtension.setRedeliveryExhausted(sourceExtension.isRedeliveryExhausted()); + resultExtension.setErrorHandlerHandled(sourceExtension.getErrorHandlerHandled()); result.setException(source.getException()); } + private static void copyFromOutMessageConditionally(Exchange result, Exchange source) { + // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) + // and the result is not failed + if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { + // copy IN to OUT as we expect a OUT response + result.getOut().copyFrom(source.getIn()); + } + } + + private static void copyFromInMessage(Exchange result, Exchange source, boolean preserverPattern) { + // no results so lets copy the last input + // as the final processor on a pipeline might not + // have created any OUT; such as a mock:endpoint + // so lets assume the last IN is the OUT + if (!preserverPattern && result.getPattern().isOutCapable()) { + // only set OUT if its OUT capable + result.getOut().copyFrom(source.getIn()); + } else { + // if not replace IN instead to keep the MEP + result.getIn().copyFrom(source.getIn()); + // clear any existing OUT as the result is on the IN + if (result.hasOut()) { + result.setOut(null); + } + } + } + + private static void copyFromOutMessage(Exchange result, Exchange source, boolean preserverPattern) { + if (preserverPattern) { + // exchange pattern sensitive + Message resultMessage = getResultMessage(result); + resultMessage.copyFrom(source.getOut()); + } else { + result.getOut().copyFrom(source.getOut()); + } + } + /** * Returns the message where to write results in an exchange-pattern-sensitive way. * diff --git a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java index 97caf1aa73090..f1813d0b32bed 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java @@ -217,12 +217,7 @@ public void copyFromWithNewBody(Message that, Object newBody) { // the headers may be the same instance if the end user has made some mistake // and set the OUT message with the same header instance of the IN message etc - boolean sameHeadersInstance = false; - if (hasHeaders() && that.hasHeaders() && getHeaders() == that.getHeaders()) { - sameHeadersInstance = true; - } - - if (!sameHeadersInstance) { + if (!sameHeaders(that)) { if (hasHeaders()) { // okay its safe to clear the headers getHeaders().clear(); @@ -233,6 +228,10 @@ public void copyFromWithNewBody(Message that, Object newBody) { } } + private boolean sameHeaders(Message that) { + return hasHeaders() && that.hasHeaders() && getHeaders() == that.getHeaders(); + } + @Override public Exchange getExchange() { return exchange; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java index 5450dddfcd5d2..e3b32b56a20fc 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java @@ -64,12 +64,7 @@ public static void doneSynchronizations(Exchange exchange, List if (synchronizations.size() > 1) { // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException - List copy = new ArrayList<>(synchronizations); - - // reverse so we invoke it FILO style instead of FIFO - Collections.reverse(copy); - // and honor if any was ordered by sorting it accordingly - copy.sort(OrderedComparator.get()); + final List copy = safeCopy(synchronizations); boolean failed = exchange.isFailed(); @@ -101,6 +96,13 @@ private static void doneSynchronization(Synchronization synchronization, Exchang public static void beforeRouteSynchronizations( Route route, Exchange exchange, List synchronizations, Logger log) { // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException + final List copy = safeCopy(synchronizations); + + // invoke synchronization callbacks + invokeSynchronizationCallbacks(route, exchange, log, copy); + } + + private static List safeCopy(List synchronizations) { List copy = new ArrayList<>(synchronizations); if (copy.size() > 1) { @@ -109,8 +111,10 @@ public static void beforeRouteSynchronizations( // and honor if any was ordered by sorting it accordingly copy.sort(OrderedComparator.get()); } + return copy; + } - // invoke synchronization callbacks + private static void invokeSynchronizationCallbacks(Route route, Exchange exchange, Logger log, List copy) { for (Synchronization synchronization : copy) { final SynchronizationRouteAware routeSynchronization = synchronization.getRouteSynchronization(); if (routeSynchronization != null) { @@ -128,14 +132,7 @@ public static void beforeRouteSynchronizations( public static void afterRouteSynchronizations( Route route, Exchange exchange, List synchronizations, Logger log) { // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException - List copy = new ArrayList<>(synchronizations); - - if (copy.size() > 1) { - // reverse so we invoke it FILO style instead of FIFO - Collections.reverse(copy); - // and honor if any was ordered by sorting it accordingly - copy.sort(OrderedComparator.get()); - } + final List copy = safeCopy(synchronizations); // invoke synchronization callbacks for (Synchronization synchronization : copy) { diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/BeanComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/BeanComponentBuilderFactory.java index f3e3faa609bdf..bf436a19e0a8b 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/BeanComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/BeanComponentBuilderFactory.java @@ -120,6 +120,22 @@ default BeanComponentBuilder autowiredEnabled(boolean autowiredEnabled) { doSetProperty("autowiredEnabled", autowiredEnabled); return this; } + /** + * Maximum cache size of internal cache for bean introspection. Setting + * a value of 0 or negative will disable the cache. + * + * The option is a: <code>int</code> type. + * + * Default: 1000 + * Group: advanced + * + * @param beanInfoCacheSize the value to set + * @return the dsl builder + */ + default BeanComponentBuilder beanInfoCacheSize(int beanInfoCacheSize) { + doSetProperty("beanInfoCacheSize", beanInfoCacheSize); + return this; + } } class BeanComponentBuilderImpl @@ -140,6 +156,7 @@ protected boolean setPropertyOnComponent( case "lazyStartProducer": ((BeanComponent) component).setLazyStartProducer((boolean) value); return true; case "scope": ((BeanComponent) component).setScope((org.apache.camel.BeanScope) value); return true; case "autowiredEnabled": ((BeanComponent) component).setAutowiredEnabled((boolean) value); return true; + case "beanInfoCacheSize": ((BeanComponent) component).setBeanInfoCacheSize((int) value); return true; default: return false; } } diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ClasComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ClasComponentBuilderFactory.java index 0713d7b5ac51f..002a7f646ff3d 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ClasComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/ClasComponentBuilderFactory.java @@ -120,6 +120,22 @@ default ClasComponentBuilder autowiredEnabled(boolean autowiredEnabled) { doSetProperty("autowiredEnabled", autowiredEnabled); return this; } + /** + * Maximum cache size of internal cache for bean introspection. Setting + * a value of 0 or negative will disable the cache. + * + * The option is a: <code>int</code> type. + * + * Default: 1000 + * Group: advanced + * + * @param beanInfoCacheSize the value to set + * @return the dsl builder + */ + default ClasComponentBuilder beanInfoCacheSize(int beanInfoCacheSize) { + doSetProperty("beanInfoCacheSize", beanInfoCacheSize); + return this; + } } class ClasComponentBuilderImpl @@ -140,6 +156,7 @@ protected boolean setPropertyOnComponent( case "lazyStartProducer": ((ClassComponent) component).setLazyStartProducer((boolean) value); return true; case "scope": ((ClassComponent) component).setScope((org.apache.camel.BeanScope) value); return true; case "autowiredEnabled": ((ClassComponent) component).setAutowiredEnabled((boolean) value); return true; + case "beanInfoCacheSize": ((ClassComponent) component).setBeanInfoCacheSize((int) value); return true; default: return false; } } diff --git a/parent/pom.xml b/parent/pom.xml index 54e8218d10a90..c4de9d7a06769 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -68,7 +68,7 @@ 1.11.3 1.11.3 4.2.0 - 2.21.1 + 2.21.2 2.14.0 1.2.17 12.0.0-beta.18 @@ -174,9 +174,9 @@ v4-rev20230526-2.0.0 v1-rev20230612-2.0.0 1.34.1 - 26.23.0 - 2.26.0 - 2.24.0 + 26.25.0 + 2.30.0 + 2.28.0 23.0.1 21.0 2.0.0 @@ -295,7 +295,7 @@ 1.1.3 3.5.1 1.9.0 - 6.8.1 + 6.9.0 6.8.1 1.17.0 1.8 @@ -304,7 +304,7 @@ 8.13.20 1.0.0 - 2.20.0 + 2.21.0 1.4.8 9.7.0 0.2.0 @@ -446,7 +446,7 @@ 2.9.0 9.9.1 4.1.2 - 2.3.9.Final + 2.3.10.Final 2.9.1 2.0.1.Final 0.10.4