Skip to content

Commit

Permalink
pool: add network traffic flow markers
Browse files Browse the repository at this point in the history
Motivation:
To analyzing the network usage by LHC  experiments WLCG plans to use
packet and flow markers. Thus it required that dCache for begin, end and
optionally during transfer send markers.

See:
https://docs.google.com/document/d/1x9JsZ7iTj44Ta06IHdkwpv5Q2u4U2QGLWnUeN2Zf5ts
https://docs.google.com/document/d/1HTaNwv7huRqdNUvgHJTjlow8MivJgoknRUKgADNlvgY
https://docs.google.com/document/d/1aAnsujpZnxn3oIUL9JZxcw0ZpoJNVXkHp-Yo5oj-B8U

Modification:
Introduce transfer lifecycle that will send FlowMarkers at the transfer
start and end.

Result:
PoC flow markers

Acked-by: Albert Rossi
Acked-by: Lea Morschel
Target: master
Require-book: yes
Require-notes: yes
  • Loading branch information
kofemann committed Oct 3, 2024
1 parent fb734fb commit 4b7dbd4
Show file tree
Hide file tree
Showing 11 changed files with 740 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ void disable(Throwable error) {
* Attach mover tho the client's NFSv41 session.
*
* @param session to attach to
* @return true if mover is new attached to the session.
*/
synchronized boolean attachSession(NFSv41Session session) {

if (_session == null) {
_session = session;
_session.getClient().attachState(_state);
Expand Down
6 changes: 6 additions & 0 deletions modules/dcache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
129 changes: 129 additions & 0 deletions modules/dcache/src/main/java/org/dcache/net/FlowMarker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2022 - 2024 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.dcache.net;

import com.google.common.net.InetAddresses;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* See: https://docs.google.com/document/d/1x9JsZ7iTj44Ta06IHdkwpv5Q2u4U2QGLWnUeN2Zf5ts/edit#
*/
public class FlowMarker {

private final static Logger LOGGER = LoggerFactory.getLogger(FlowMarker.class);

public static class FlowMarkerBuilder {

private final static int VERSION = 1;

private final JSONObject payload = new JSONObject();
private final JSONObject lifecycle = new JSONObject();
private final JSONObject context = new JSONObject();
private final JSONObject flow = new JSONObject();

public FlowMarkerBuilder withStartedAt(Instant startTime) {
lifecycle.put("start-time", DateTimeFormatter.ISO_INSTANT.format(startTime));
return this;
}

public FlowMarkerBuilder withFinishedAt(Instant startTime) {
lifecycle.put("end-time", DateTimeFormatter.ISO_INSTANT.format(startTime));
return this;
}

public FlowMarkerBuilder withExperimentId(int id) {
context.put("experiment-id", id);
return this;
}

public FlowMarkerBuilder withActivityId(int id) {
context.put("activity-id", id);
return this;
}

public FlowMarkerBuilder wittApplication(String app) {
context.put("application", app);
return this;
}

public FlowMarkerBuilder withAFI(String afi) {
switch (afi) {
case "ipv6":
case "ipv4":
break;
default:
throw new IllegalArgumentException("AFI can be 'ipv4' or 'ipv6'");
}

flow.put("afi", afi);
return this;
}

public FlowMarkerBuilder withSource(InetSocketAddress addr) {
flow.put("src-ip", InetAddresses.toAddrString(addr.getAddress()));
flow.put("src-port", addr.getPort());
return this;
}

public FlowMarkerBuilder withDestination(InetSocketAddress addr) {
flow.put("dst-ip", InetAddresses.toAddrString(addr.getAddress()));
flow.put("dst-port", addr.getPort());
return this;
}

public FlowMarkerBuilder withProtocol(String proto) {
switch (proto) {
case "tcp":
case "udp":
break;
default:
throw new IllegalArgumentException("Protocol can be 'tcp' or 'udp'");
}

flow.put("protocol", proto);
return this;
}

public JSONObject build(String state) {

switch (state) {
case "start":
case "end":
case "ongoing":
break;
default:
throw new IllegalArgumentException("State can be 'start', 'ongoing' or 'end'");
}

payload.put("version", VERSION);
payload.put("flow-lifecycle", lifecycle);
payload.put("context", context);
payload.put("flow-id", flow);

lifecycle.put("state", state);
lifecycle.put("current-time", DateTimeFormatter.ISO_INSTANT.format(Instant.now()));

return payload;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import diskCacheV111.util.CacheException;
import diskCacheV111.vehicles.DoorTransferFinishedMessage;
import diskCacheV111.vehicles.IpProtocolInfo;
import diskCacheV111.vehicles.MoverInfoMessage;
import dmg.cells.nucleus.AbstractCellComponent;
import dmg.cells.nucleus.CellInfoProvider;
Expand All @@ -37,6 +38,7 @@
import java.util.function.Consumer;
import org.dcache.cells.CellStub;
import org.dcache.pool.movers.Mover;
import org.dcache.pool.movers.TransferLifeCycle;
import org.dcache.pool.repository.ModifiableReplicaDescriptor;
import org.dcache.pool.repository.ReplicaDescriptor;
import org.dcache.pool.statistics.DirectedIoStatistics;
Expand Down Expand Up @@ -71,6 +73,8 @@ public class DefaultPostTransferService extends AbstractCellComponent implements
private Consumer<MoverInfoMessage> _kafkaSender = (s) -> {
};

private TransferLifeCycle transferLifeCycle;

@Required
public void setBillingStub(CellStub billing) {
_billing = billing;
Expand All @@ -92,6 +96,10 @@ public void setKafkaTemplate(KafkaTemplate kafkaTemplate) {
_kafkaSender = kafkaTemplate::sendDefault;
}

public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) {
this.transferLifeCycle = transferLifeCycle;
}

public void init() {
_door = new CellStub(getCellEndpoint());
}
Expand Down Expand Up @@ -229,6 +237,12 @@ public void sendFinished(Mover<?> mover, MoverInfoMessage moverInfoMessage) {
finished.setReply(mover.getErrorCode(), mover.getErrorMessage());
}

mover.getLocalEndpoint().ifPresent(e ->
transferLifeCycle.onEnd(((IpProtocolInfo) mover.getProtocolInfo()).getSocketAddress(),
e,
mover.getProtocolInfo(),
mover.getSubject()));

_door.notify(mover.getPathToDoor(), finished);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public abstract class NettyTransferService<P extends ProtocolInfo>

private final List<io.netty.util.concurrent.Future<?>> shutdownFutures = new ArrayList<>();

private TransferLifeCycle transferLifeCycle;

public NettyTransferService(String name) {
this.name = name;
}
Expand Down Expand Up @@ -220,6 +222,14 @@ public void setPortRange(NettyPortRange portRange) {
this.portRange = portRange;
}

public void setTransferLifeCycle(TransferLifeCycle transferLifeCycle) {
this.transferLifeCycle = transferLifeCycle;
}

public TransferLifeCycle getTransferLifeCycle() {
return transferLifeCycle;
}

public NettyPortRange getPortRange() {
return portRange;
}
Expand Down Expand Up @@ -372,6 +382,9 @@ public void execute()

InetSocketAddress localEndpoint = new InetSocketAddress(localIP, getServerAddress().getPort());
mover.setLocalEndpoint(localEndpoint);
transferLifeCycle.onStart(((IpProtocolInfo)mover.getProtocolInfo()).getSocketAddress(),
localEndpoint, mover.getProtocolInfo(), mover.getSubject());

sendAddressToDoor(mover, localEndpoint);
}

Expand Down
Loading

0 comments on commit 4b7dbd4

Please sign in to comment.