diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 8a29b57768fcb..a5cd26184d430 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer; import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; +import org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer; import org.apache.ignite.internal.codegen.DeploymentModeMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer; @@ -428,7 +429,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)131, UserAuthenticateRequestMessage::new, new UserAuthenticateRequestMessageSerializer()); factory.register((short)132, UserAuthenticateResponseMessage::new, new UserAuthenticateResponseMessageSerializer()); factory.register((short)133, ClusterMetricsUpdateMessage::new); - factory.register((short)134, ContinuousRoutineStartResultMessage::new); + factory.register((short)134, ContinuousRoutineStartResultMessage::new, new ContinuousRoutineStartResultMessageSerializer()); factory.register((short)135, LatchAckMessage::new, new LatchAckMessageSerializer()); factory.register((short)157, PartitionUpdateCountersMessage::new); factory.register((short)162, GenerateEncryptionKeyRequest::new, new GenerateEncryptionKeyRequestSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java index ad0a47c770b8e..aeebdae87349e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java @@ -17,13 +17,12 @@ package org.apache.ignite.internal.processors.continuous; -import java.nio.ByteBuffer; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** @@ -31,19 +30,16 @@ */ public class ContinuousRoutineStartResultMessage implements Message { /** */ - private static final int ERROR_FLAG = 0x01; - - /** */ + @Order(0) private UUID routineId; /** */ - private byte[] errBytes; + @Order(value = 1, method = "errorMessage") + private @Nullable ErrorMessage errMsg; /** */ - private byte[] cntrsMapBytes; - - /** */ - private int flags; + @Order(value = 2, method = "countersMap") + private CachePartitionPartialCountersMap cntrsMap; /** * @@ -54,128 +50,68 @@ public ContinuousRoutineStartResultMessage() { /** * @param routineId Routine ID. - * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. - * @param errBytes Error bytes. - * @param err {@code True} if failed to start routine. + * @param cntrsMap Counters map. + * @param err Error. */ - ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes, byte[] errBytes, boolean err) { + ContinuousRoutineStartResultMessage( + UUID routineId, + @Nullable CachePartitionPartialCountersMap cntrsMap, + @Nullable Throwable err + ) { this.routineId = routineId; - this.cntrsMapBytes = cntrsMapBytes; - this.errBytes = errBytes; + this.cntrsMap = cntrsMap; - if (err) - flags |= ERROR_FLAG; + if (err != null) + errMsg = new ErrorMessage(err); } /** - * @return Marshalled {@link CachePartitionPartialCountersMap}. + * @return Counters map. */ - @Nullable byte[] countersMapBytes() { - return cntrsMapBytes; + public @Nullable CachePartitionPartialCountersMap countersMap() { + return cntrsMap; } /** - * @return {@code True} if failed to start routine. + * @param cntrsMap Counters map. */ - boolean error() { - return (flags & ERROR_FLAG) != 0; + public void countersMap(@Nullable CachePartitionPartialCountersMap cntrsMap) { + this.cntrsMap = cntrsMap; } /** * @return Routine ID. */ - UUID routineId() { + public UUID routineId() { return routineId; } /** - * @return Error bytes. + * @param routineId Routine ID. */ - @Nullable byte[] errorBytes() { - return errBytes; + public void routineId(UUID routineId) { + this.routineId = routineId; } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray(cntrsMapBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeByteArray(errBytes)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeInt(flags)) - return false; - - writer.incrementState(); - - case 3: - if (!writer.writeUuid(routineId)) - return false; - - writer.incrementState(); - - } - - return true; + /** + * @return Error message. + */ + public @Nullable ErrorMessage errorMessage() { + return errMsg; } - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - cntrsMapBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - errBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - flags = reader.readInt(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 3: - routineId = reader.readUuid(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } + /** + * @param errMsg Error message. + */ + public void errorMessage(@Nullable ErrorMessage errMsg) { + this.errMsg = errMsg; + } - return true; + /** + * @return Error. + */ + public @Nullable Throwable error() { + return ErrorMessage.error(errMsg); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index df98fc299d4c8..2ae5fa57f5c07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -1545,7 +1545,7 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, IgnitePredicate nodeFilter = null; - byte[] cntrs = null; + CachePartitionPartialCountersMap cntrsMap = null; if (reqData.nodeFilterBytes() != null) { try { @@ -1621,12 +1621,8 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, if (proc != null) { GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); - if (cache != null && cache.context().userCache()) { - CachePartitionPartialCountersMap cntrsMap = - cache.context().topology().localUpdateCounters(false); - - cntrs = U.marshal(marsh, cntrsMap); - } + if (cache != null && cache.context().userCache()) + cntrsMap = cache.context().topology().localUpdateCounters(false); } } } @@ -1639,7 +1635,7 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, } } - sendMessageStartResult(snd, msg.routineId(), cntrs, err); + sendMessageStartResult(snd, msg.routineId(), cntrsMap, err); } }); } @@ -1647,32 +1643,17 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, /** * @param node Target node. * @param routineId Routine ID. - * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. + * @param cntrsMap Counters map. * @param err Start error if any. */ private void sendMessageStartResult(final ClusterNode node, final UUID routineId, - byte[] cntrsMapBytes, + CachePartitionPartialCountersMap cntrsMap, @Nullable final Exception err ) { - byte[] errBytes = null; - - if (err != null) { - try { - errBytes = U.marshal(marsh, err); - } - catch (Exception e) { - U.error(log, "Failed to marshal routine start error: " + e, e); - } - } - - ContinuousRoutineStartResultMessage msg = new ContinuousRoutineStartResultMessage(routineId, - cntrsMapBytes, - errBytes, - err != null); - try { - ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL); + ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, new ContinuousRoutineStartResultMessage(routineId, cntrsMap, err), + SYSTEM_POOL); } catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) @@ -2561,7 +2542,7 @@ private class StartFuture extends GridFutureAdapter { resCollect = new DiscoveryMessageResultsCollector(ctx) { @Override protected RoutineRegisterResults createResult(Map> rcvd) { - Map errs = null; + Map errs = null; Map>> cntrsPerNode = null; for (Map.Entry> entry : rcvd.entrySet()) { @@ -2570,48 +2551,22 @@ private class StartFuture extends GridFutureAdapter { if (msg == null) continue; - if (msg.error()) { - byte[] errBytes = msg.errorBytes(); - - Exception err = null; - - if (errBytes != null) { - try { - err = U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); - } - catch (Exception e) { - U.warn(log, "Failed to unmarhal continuous routine start error: " + e); - } - } - - if (err == null) { - err = new IgniteCheckedException("Failed to start continuous " + - "routine on node: " + entry.getKey()); - } + Throwable err = msg.error(); + if (err != null) { if (errs == null) errs = new HashMap<>(); errs.put(entry.getKey(), err); } else { - byte[] cntrsMapBytes = msg.countersMapBytes(); - - if (cntrsMapBytes != null) { - try { - CachePartitionPartialCountersMap cntrsMap = U.unmarshal( - marsh, - cntrsMapBytes, - U.resolveClassLoader(ctx.config())); + CachePartitionPartialCountersMap cntrsMap = msg.countersMap(); - if (cntrsPerNode == null) - cntrsPerNode = new HashMap<>(); + if (cntrsMap != null) { + if (cntrsPerNode == null) + cntrsPerNode = new HashMap<>(); - cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); - } - catch (Exception e) { - U.warn(log, "Failed to unmarhal continuous query update counters: " + e); - } + cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); } } } @@ -2637,7 +2592,7 @@ private class StartFuture extends GridFutureAdapter { */ private void onAllRemoteRegistered( AffinityTopologyVersion topVer, - @Nullable Map errs, + @Nullable Map errs, Map>> cntrsPerNode, Map> cntrs) { try { @@ -2661,7 +2616,7 @@ private void onAllRemoteRegistered( onRemoteRegistered(); } else { - Exception firstEx = F.first(errs.values()); + Throwable firstEx = F.first(errs.values()); onDone(firstEx); @@ -2729,7 +2684,7 @@ private static class RoutineRegisterResults { private final AffinityTopologyVersion topVer; /** */ - private final Map errs; + private final Map errs; /** */ private final Map>> cntrsPerNode; @@ -2740,7 +2695,7 @@ private static class RoutineRegisterResults { * @param cntrsPerNode Update counters. */ RoutineRegisterResults(AffinityTopologyVersion topVer, - Map errs, + Map errs, Map>> cntrsPerNode) { this.topVer = topVer; this.errs = errs;