Skip to content

Commit

Permalink
Merge pull request #2 from muryoh/concurrentHashMap
Browse files Browse the repository at this point in the history
Optimizations for heavy multithreaded usage
They're a bit rough around the corner and will need some additional cleanup
  • Loading branch information
muryoh authored Nov 28, 2019
2 parents d79f53d + d0b09c3 commit 6620b58
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 65 deletions.
12 changes: 6 additions & 6 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<packaging>bundle</packaging>
<name>Apache Felix iPOJO</name>
<artifactId>org.apache.felix.ipojo</artifactId>
<version>1.12.1-ullink1</version>
<version>1.12.1.2-ullink-SNAPSHOT</version>

<properties>
<!--
Expand Down Expand Up @@ -129,10 +129,10 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<target>1.5</target>
<source>1.5</source>
<testTarget>1.5</testTarget>
<testSource>1.5</testSource>
<target>1.6</target>
<source>1.6</source>
<testTarget>1.6</testTarget>
<testSource>1.6</testSource>
</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -357,7 +357,7 @@
<configuration>
<signature>
<groupId>org.codehaus.mojo.signature</groupId>
<artifactId>java15</artifactId>
<artifactId>java16</artifactId>
<version>1.0</version>
</signature>
</configuration>
Expand Down
63 changes: 42 additions & 21 deletions core/src/main/java/org/apache/felix/ipojo/InstanceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* This class defines the container of primitive instances. It manages content initialization
Expand Down Expand Up @@ -141,13 +142,38 @@ public class InstanceManager implements ComponentInstance, InstanceStateListener
* The map of [field, value], storing POJO managed
* field value.
*/
private Map m_fields = new HashMap();
private Map m_fields = new ConcurrentHashMap();

/**
* The Map storing the Method objects by ids.
* [id=>{@link Method}].
*/
private Map m_methods = Collections.synchronizedMap(new HashMap());
private Map m_methods = new ConcurrentHashMap();
private static Member NO_METHOD = new Member() {
@Override
public Class<?> getDeclaringClass()
{
return null;
}

@Override
public String getName()
{
return null;
}

@Override
public int getModifiers()
{
return 0;
}

@Override
public boolean isSynthetic()
{
return false;
}
};

/**
* The instance's bundle context.
Expand Down Expand Up @@ -1180,10 +1206,7 @@ public void register(int index, ConstructorInjector injector) throws Configurati
* @return the value decided by the last asked handler (throws a warning if two fields decide two different values)
*/
public Object onGet(Object pojo, String fieldName) {
Object initialValue = null;
synchronized (this) { // Stack confinement.
initialValue = m_fields.get(fieldName);
}
Object initialValue = m_fields.get(fieldName);
Object result = initialValue;
boolean hasChanged = false;
// Get the list of registered handlers
Expand Down Expand Up @@ -1211,9 +1234,7 @@ public Object onGet(Object pojo, String fieldName) {
if (hasChanged) {
// A change occurs => notify the change
//TODO consider just changing the reference, however multiple thread can be an issue
synchronized (this) {
m_fields.put(fieldName, result);
}
m_fields.put(fieldName, result);
// Call onset outside of a synchronized block.
for (int i = 0; list != null && i < list.length; i++) {
list[i].onSet(pojo, fieldName, result);
Expand Down Expand Up @@ -1306,9 +1327,12 @@ public void onError(Object pojo, String methodId, Throwable error) {
* @param methodId the method id
* @return the method object or <code>null</code> if the method cannot be found.
*/
private Member getMethodById(String methodId) {
private Member getMethodById(final String methodId) {
// Used a synchronized map.
Member member = (Member) m_methods.get(methodId);
if (member == NO_METHOD) {
member = null;
}
if (!m_methods.containsKey(methodId) && m_clazz != null) {
// Is it a inner class method
if (methodId.contains("___")) { // Mark to detect a inner class method.
Expand All @@ -1318,29 +1342,30 @@ private Member getMethodById(String methodId) {
return null;
} else {
String innerClassName = split[0];
methodId = split[1];
String innerMethodName = split[1];

// We can't find the member objects from anonymous methods, identified by their numeric name
// Just escaping in this case.
if (innerClassName.matches("-?\\d+")) {
m_methods.put(methodId, null);
m_methods.put(methodId, NO_METHOD);
return null;
}

for (Class c : m_clazz.getDeclaredClasses()) {
if (innerClassName.equals(c.getSimpleName())) {
Method[] mets = c.getDeclaredMethods();
for (Method met : mets) {
if (MethodMetadata.computeMethodId(met).equals(methodId)) {
if (MethodMetadata.computeMethodId(met).equals(innerMethodName)) {
// Store the new methodId
m_methods.put(methodId, met);
return met;
}
}
}
m_logger.log(Logger.INFO, "Cannot find the member associated to " + methodId + " - reason: " +
"cannot find the class " + innerClassName + " declared in " + m_clazz.getName());
}
m_logger.log(Logger.INFO, "Cannot find the member associated to " + methodId + " - reason: " +
"cannot find the class " + innerClassName + " declared in " + m_clazz.getName());
return null;
}
}

Expand Down Expand Up @@ -1388,12 +1413,8 @@ private Member getMethodById(String methodId) {
* @param objectValue the new value of the field
*/
public void onSet(final Object pojo, final String fieldName, final Object objectValue) {
synchronized (this) {
// First, store the new value.
// This must be done in a synchronized block to avoid
// concurrent modification
m_fields.put(fieldName, objectValue);
}
// First, store the new value.
m_fields.put(fieldName, objectValue);
// The registrations cannot be modified, so we can directly access
// the interceptor list.
FieldInterceptor[] list = (FieldInterceptor[]) m_fieldRegistration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ public boolean isServiceLevelRequirement() {
* @see org.apache.felix.ipojo.util.DependencyModel#onServiceArrival(org.osgi.framework.ServiceReference)
*/
public void onServiceArrival(ServiceReference reference) {
if (m_usage != null) {
m_usage.markOutdated();
}
callBindMethod(reference);
//The method is only called when a new service arrives, or when the used one is replaced.
}
Expand All @@ -576,6 +579,9 @@ public void onServiceArrival(ServiceReference reference) {
* @see org.apache.felix.ipojo.util.DependencyModel#onServiceModification(org.osgi.framework.ServiceReference)
*/
public void onServiceModification(ServiceReference reference) {
if (m_usage != null) {
m_usage.markOutdated();
}
callModifyMethod(reference);
}

Expand All @@ -586,6 +592,9 @@ public void onServiceModification(ServiceReference reference) {
* @see org.apache.felix.ipojo.util.DependencyModel#onServiceDeparture(org.osgi.framework.ServiceReference)
*/
public void onServiceDeparture(ServiceReference ref) {
if (m_usage != null) {
m_usage.markOutdated();
}
callUnbindMethod(ref);
}

Expand All @@ -599,6 +608,9 @@ public void onServiceDeparture(ServiceReference ref) {
* @see org.apache.felix.ipojo.util.DependencyModel#onDependencyReconfiguration(org.osgi.framework.ServiceReference[], org.osgi.framework.ServiceReference[])
*/
public void onDependencyReconfiguration(ServiceReference[] departs, ServiceReference[] arrivals) {
if (m_usage != null) {
m_usage.markOutdated();
}
for (int i = 0; departs != null && i < departs.length; i++) {
callUnbindMethod(departs[i]);
}
Expand Down Expand Up @@ -661,13 +673,15 @@ public Object getService() {
Usage usage = (Usage) m_usage.get();
if (usage.m_stack == 0) { // uninitialized usage.
if (usage.m_componentStack > 0) {
// We comes from the component who didn't touch the service.
// So we initialize the usage.
createServiceObject(usage);
if (!usage.isUpToDate()) {
// We comes from the component who didn't touch the service.
// So we initialize the usage.
createServiceObject(usage);
}
usage.inc(); // Start the caching, so set the stack level to 1
m_usage.set(usage); // Required by Dalvik.
if (isAggregate()) {
Object obj = usage.m_object;
Object obj = usage.getObject();
if (obj instanceof Set) {
List<Object> list = new ArrayList<Object>();
list.addAll((Set) obj);
Expand All @@ -677,7 +691,7 @@ public Object getService() {
return obj;
}
} else {
return usage.m_object;
return usage.getObject();
}
} else {
// External access => Immediate get.
Expand Down Expand Up @@ -707,7 +721,7 @@ public Object getService() {
// Use the copy.
// if the copy is a set, transform to a list
if (isAggregate()) {
Object obj = usage.m_object;
Object obj = usage.getObject();
if (obj instanceof Set) {
List<Object> list = new ArrayList<Object>();
list.addAll((Set) obj);
Expand All @@ -717,7 +731,7 @@ public Object getService() {
return obj;
}
} else {
return usage.m_object;
return usage.getObject();
}

}
Expand All @@ -738,12 +752,14 @@ public Object onGet(Object pojo, String fieldName, Object value) {
// Initialize the thread local object is not already touched.
Usage usage = m_usage.get();
if (usage.m_stack == 0) { // uninitialized usage.
createServiceObject(usage);
if (!usage.isUpToDate()) {
createServiceObject(usage);
}
usage.inc(); // Start the caching, so set the stack level to 1
m_usage.set(usage); // Required by Dalvik
}
if (!m_isProxy) {
return usage.m_object;
return usage.getObject();
} else {
return m_proxyObject;
}
Expand Down Expand Up @@ -777,64 +793,64 @@ private void createServiceObject(Usage usage) {
m_handler.warn("[" + m_handler.getInstanceManager().getInstanceName() + "] The dependency is not optional, however no service object can be injected in " + m_field + " -> " + getSpecification().getName());
createNullableObject();
}
usage.m_object = m_nullable; // Add null if the Nullable pattern is disabled.
usage.setObject(m_nullable); // Add null if the Nullable pattern is disabled.
} else {
ServiceReference ref = getServiceReference();
usage.m_object = getService(ref);
usage.setObject(getService(ref));
}
} else {
switch(m_type) {
case ARRAY:
try {
if (refs == null) {
usage.m_object = (Object[]) Array.newInstance(getSpecification(), 0); // Create an empty array.
usage.setObject((Object[]) Array.newInstance(getSpecification(), 0)); // Create an empty array.
} else {
// Use a reflective construction to avoid class cast exception. This method allows setting the component type.
Object[] objs = (Object[]) Array.newInstance(getSpecification(), refs.length);
for (int i = 0; i < refs.length; i++) {
ServiceReference ref = refs[i];
objs[i] = getService(ref);
}
usage.m_object = objs;
usage.setObject(objs);
}
} catch (ArrayStoreException e) {
throw new RuntimeException("Cannot create the array - Check that the bundle can access the service interface", e);
}
break;
case LIST:
if (refs == null) {
usage.m_object = Collections.emptyList();
usage.setObject(Collections.emptyList());
} else {
// Use a list to store service objects
List<Object> objs = new ArrayList<Object>(refs.length);
for (ServiceReference ref : refs) {
objs.add(getService(ref));
}
usage.m_object = objs;
usage.setObject(objs);
}
break;
case SET:
if (refs == null) {
usage.m_object = Collections.emptySet();
usage.setObject(Collections.emptySet());
} else {
// Use a vector to store service objects
Set<Object> objs = new HashSet<Object>(refs.length);
for (ServiceReference ref : refs) {
objs.add(getService(ref));
}
usage.m_object = objs;
usage.setObject(objs);
}
break;
case VECTOR:
if (refs == null) {
usage.m_object = new Vector(0); // Create an empty vector.
usage.setObject(new Vector(0)); // Create an empty vector.
} else {
// Use a vector to store service objects
Vector<Object> objs = new Vector<Object>(refs.length);
for (ServiceReference ref : refs) {
objs.add(getService(ref));
}
usage.m_object = objs;
usage.setObject(objs);
}
break;
}
Expand Down Expand Up @@ -934,12 +950,7 @@ public void onFinally(Object pojo, Member method) {
Usage usage = m_usage.get();
usage.decComponentStack();
if (usage.m_stack > 0) {
if (usage.dec()) {
// Exit the method flow => Release all objects
usage.clear();
// Also remove the thread local object.
m_usage.remove();
}
usage.dec();
}
}
}
Expand Down
Loading

0 comments on commit 6620b58

Please sign in to comment.