⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
import org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer;
import org.apache.ignite.internal.codegen.DeploymentModeMessageSerializer;
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
Expand Down Expand Up @@ -428,7 +429,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register((short)131, UserAuthenticateRequestMessage::new, new UserAuthenticateRequestMessageSerializer());
factory.register((short)132, UserAuthenticateResponseMessage::new, new UserAuthenticateResponseMessageSerializer());
factory.register((short)133, ClusterMetricsUpdateMessage::new);
factory.register((short)134, ContinuousRoutineStartResultMessage::new);
factory.register((short)134, ContinuousRoutineStartResultMessage::new, new ContinuousRoutineStartResultMessageSerializer());
factory.register((short)135, LatchAckMessage::new, new LatchAckMessageSerializer());
factory.register((short)157, PartitionUpdateCountersMessage::new);
factory.register((short)162, GenerateEncryptionKeyRequest::new, new GenerateEncryptionKeyRequestSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,29 @@

package org.apache.ignite.internal.processors.continuous;

import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;

/**
*
*/
public class ContinuousRoutineStartResultMessage implements Message {
/** */
private static final int ERROR_FLAG = 0x01;

/** */
@Order(0)
private UUID routineId;

/** */
private byte[] errBytes;
@Order(value = 1, method = "errorMessage")
private @Nullable ErrorMessage errMsg;

/** */
private byte[] cntrsMapBytes;

/** */
private int flags;
@Order(value = 2, method = "countersMap")
private CachePartitionPartialCountersMap cntrsMap;

/**
*
Expand All @@ -54,128 +50,68 @@ public ContinuousRoutineStartResultMessage() {

/**
* @param routineId Routine ID.
* @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}.
* @param errBytes Error bytes.
* @param err {@code True} if failed to start routine.
* @param cntrsMap Counters map.
* @param err Error.
*/
ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes, byte[] errBytes, boolean err) {
ContinuousRoutineStartResultMessage(
UUID routineId,
@Nullable CachePartitionPartialCountersMap cntrsMap,
@Nullable Throwable err
) {
this.routineId = routineId;
this.cntrsMapBytes = cntrsMapBytes;
this.errBytes = errBytes;
this.cntrsMap = cntrsMap;

if (err)
flags |= ERROR_FLAG;
if (err != null)
errMsg = new ErrorMessage(err);
}

/**
* @return Marshalled {@link CachePartitionPartialCountersMap}.
* @return Counters map.
*/
@Nullable byte[] countersMapBytes() {
return cntrsMapBytes;
public @Nullable CachePartitionPartialCountersMap countersMap() {
return cntrsMap;
}

/**
* @return {@code True} if failed to start routine.
* @param cntrsMap Counters map.
*/
boolean error() {
return (flags & ERROR_FLAG) != 0;
public void countersMap(@Nullable CachePartitionPartialCountersMap cntrsMap) {
this.cntrsMap = cntrsMap;
}

/**
* @return Routine ID.
*/
UUID routineId() {
public UUID routineId() {
return routineId;
}

/**
* @return Error bytes.
* @param routineId Routine ID.
*/
@Nullable byte[] errorBytes() {
return errBytes;
public void routineId(UUID routineId) {
this.routineId = routineId;
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);

if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType()))
return false;

writer.onHeaderWritten();
}

switch (writer.state()) {
case 0:
if (!writer.writeByteArray(cntrsMapBytes))
return false;

writer.incrementState();

case 1:
if (!writer.writeByteArray(errBytes))
return false;

writer.incrementState();

case 2:
if (!writer.writeInt(flags))
return false;

writer.incrementState();

case 3:
if (!writer.writeUuid(routineId))
return false;

writer.incrementState();

}

return true;
/**
* @return Error message.
*/
public @Nullable ErrorMessage errorMessage() {
return errMsg;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);

switch (reader.state()) {
case 0:
cntrsMapBytes = reader.readByteArray();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 1:
errBytes = reader.readByteArray();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 2:
flags = reader.readInt();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 3:
routineId = reader.readUuid();

if (!reader.isLastRead())
return false;

reader.incrementState();

}
/**
* @param errMsg Error message.
*/
public void errorMessage(@Nullable ErrorMessage errMsg) {
this.errMsg = errMsg;
}

return true;
/**
* @return Error.
*/
public @Nullable Throwable error() {
return ErrorMessage.error(errMsg);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1545,7 +1545,7 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer,

IgnitePredicate<ClusterNode> nodeFilter = null;

byte[] cntrs = null;
CachePartitionPartialCountersMap cntrsMap = null;

if (reqData.nodeFilterBytes() != null) {
try {
Expand Down Expand Up @@ -1621,12 +1621,8 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer,
if (proc != null) {
GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());

if (cache != null && cache.context().userCache()) {
CachePartitionPartialCountersMap cntrsMap =
cache.context().topology().localUpdateCounters(false);

cntrs = U.marshal(marsh, cntrsMap);
}
if (cache != null && cache.context().userCache())
cntrsMap = cache.context().topology().localUpdateCounters(false);
}
}
}
Expand All @@ -1639,40 +1635,25 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer,
}
}

sendMessageStartResult(snd, msg.routineId(), cntrs, err);
sendMessageStartResult(snd, msg.routineId(), cntrsMap, err);
}
});
}

/**
* @param node Target node.
* @param routineId Routine ID.
* @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}.
* @param cntrsMap Counters map.
* @param err Start error if any.
*/
private void sendMessageStartResult(final ClusterNode node,
final UUID routineId,
byte[] cntrsMapBytes,
CachePartitionPartialCountersMap cntrsMap,
@Nullable final Exception err
) {
byte[] errBytes = null;

if (err != null) {
try {
errBytes = U.marshal(marsh, err);
}
catch (Exception e) {
U.error(log, "Failed to marshal routine start error: " + e, e);
}
}

ContinuousRoutineStartResultMessage msg = new ContinuousRoutineStartResultMessage(routineId,
cntrsMapBytes,
errBytes,
err != null);

try {
ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL);
ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, new ContinuousRoutineStartResultMessage(routineId, cntrsMap, err),
SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
Expand Down Expand Up @@ -2561,7 +2542,7 @@ private class StartFuture extends GridFutureAdapter<UUID> {

resCollect = new DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, RoutineRegisterResults>(ctx) {
@Override protected RoutineRegisterResults createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) {
Map<UUID, Exception> errs = null;
Map<UUID, Throwable> errs = null;
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = null;

for (Map.Entry<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) {
Expand All @@ -2570,48 +2551,22 @@ private class StartFuture extends GridFutureAdapter<UUID> {
if (msg == null)
continue;

if (msg.error()) {
byte[] errBytes = msg.errorBytes();

Exception err = null;

if (errBytes != null) {
try {
err = U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config()));
}
catch (Exception e) {
U.warn(log, "Failed to unmarhal continuous routine start error: " + e);
}
}

if (err == null) {
err = new IgniteCheckedException("Failed to start continuous " +
"routine on node: " + entry.getKey());
}
Throwable err = msg.error();

if (err != null) {
if (errs == null)
errs = new HashMap<>();

errs.put(entry.getKey(), err);
}
else {
byte[] cntrsMapBytes = msg.countersMapBytes();

if (cntrsMapBytes != null) {
try {
CachePartitionPartialCountersMap cntrsMap = U.unmarshal(
marsh,
cntrsMapBytes,
U.resolveClassLoader(ctx.config()));
CachePartitionPartialCountersMap cntrsMap = msg.countersMap();

if (cntrsPerNode == null)
cntrsPerNode = new HashMap<>();
if (cntrsMap != null) {
if (cntrsPerNode == null)
cntrsPerNode = new HashMap<>();

cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
}
catch (Exception e) {
U.warn(log, "Failed to unmarhal continuous query update counters: " + e);
}
cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
}
}
}
Expand All @@ -2637,7 +2592,7 @@ private class StartFuture extends GridFutureAdapter<UUID> {
*/
private void onAllRemoteRegistered(
AffinityTopologyVersion topVer,
@Nullable Map<UUID, ? extends Exception> errs,
@Nullable Map<UUID, ? extends Throwable> errs,
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
try {
Expand All @@ -2661,7 +2616,7 @@ private void onAllRemoteRegistered(
onRemoteRegistered();
}
else {
Exception firstEx = F.first(errs.values());
Throwable firstEx = F.first(errs.values());

onDone(firstEx);

Expand Down Expand Up @@ -2729,7 +2684,7 @@ private static class RoutineRegisterResults {
private final AffinityTopologyVersion topVer;

/** */
private final Map<UUID, ? extends Exception> errs;
private final Map<UUID, ? extends Throwable> errs;

/** */
private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode;
Expand All @@ -2740,7 +2695,7 @@ private static class RoutineRegisterResults {
* @param cntrsPerNode Update counters.
*/
RoutineRegisterResults(AffinityTopologyVersion topVer,
Map<UUID, ? extends Exception> errs,
Map<UUID, ? extends Throwable> errs,
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) {
this.topVer = topVer;
this.errs = errs;
Expand Down