diff --git a/.mise.toml b/.mise.toml deleted file mode 100644 index 6daaf4e4cb..0000000000 --- a/.mise.toml +++ /dev/null @@ -1,5 +0,0 @@ -[tools] -java = "temurin-17" - -[env] -JAVA_HOME = "{{exec(command='mise where java')}}" diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md index 7916f62783..c31607f753 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md @@ -56,8 +56,8 @@ public class Subscriber { public static void main(String[] args) throws Exception { try (var client = new DaprClientBuilder().buildPreviewClient()) { - // Subscribe to events - receives raw String data directly - client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING) + // Subscribe to topic - receives raw String data directly + client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING) .doOnNext(message -> { System.out.println("Subscriber got: " + message); }) @@ -79,8 +79,8 @@ public class SubscriberCloudEvent { public static void main(String[] args) throws Exception { try (var client = new DaprClientBuilder().buildPreviewClient()) { - // Subscribe to events - receives CloudEvent with full metadata - client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef>() {}) + // Subscribe to topic - receives CloudEvent with full metadata + client.subscribeToTopic(PUBSUB_NAME, topicName, new TypeRef>() {}) .doOnNext(cloudEvent -> { System.out.println("Received CloudEvent:"); System.out.println(" ID: " + cloudEvent.getId()); @@ -101,7 +101,7 @@ public class SubscriberCloudEvent { You can also pass metadata to the subscription, for example to enable raw payload mode: ```java -client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPayload", "true")) +client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPayload", "true")) .doOnNext(message -> { System.out.println("Subscriber got: " + message); }) diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java index 59be7beb2c..e9456dc24c 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java @@ -47,8 +47,8 @@ public static void main(String[] args) throws Exception { try (var client = new DaprClientBuilder().buildPreviewClient()) { System.out.println("Subscribing to topic: " + topicName); - // Subscribe to events - receives raw String data directly - client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING) + // Subscribe to topic - receives raw String data directly + client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING) .doOnNext(message -> { System.out.println("Subscriber got: " + message); }) diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/SubscriberCloudEvent.java b/examples/src/main/java/io/dapr/examples/pubsub/stream/SubscriberCloudEvent.java index a76603fdbb..5f52d51d04 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/SubscriberCloudEvent.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/SubscriberCloudEvent.java @@ -50,9 +50,9 @@ public static void main(String[] args) throws Exception { try (var client = new DaprClientBuilder().buildPreviewClient()) { System.out.println("Subscribing to topic: " + topicName + " (CloudEvent mode)"); - // Subscribe to events - receives CloudEvent with full metadata + // Subscribe to topic - receives CloudEvent with full metadata // Use TypeRef> to get CloudEvent wrapper with metadata - client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef>() {}) + client.subscribeToTopic(PUBSUB_NAME, topicName, new TypeRef>() {}) .doOnNext(cloudEvent -> { System.out.println("Received CloudEvent:"); System.out.println(" ID: " + cloudEvent.getId()); diff --git a/mise.toml b/mise.toml new file mode 100644 index 0000000000..262fd088c7 --- /dev/null +++ b/mise.toml @@ -0,0 +1,2 @@ +[tools] +java = "temurin-17" diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java index a1d1b4fe33..334bc5232a 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java @@ -151,8 +151,8 @@ public void testPubSubFlux() throws Exception { Set messages = Collections.synchronizedSet(new HashSet<>()); - // subscribeToEvents now returns Flux directly (raw data) - var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME_FLUX, TypeRef.STRING) + // subscribeToTopic returns Flux directly (raw data) + var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_FLUX, TypeRef.STRING) .doOnNext(rawMessage -> { // rawMessage is String directly if (rawMessage.contains(runId)) { @@ -196,7 +196,7 @@ public void testPubSubCloudEvent() throws Exception { Set messageIds = Collections.synchronizedSet(new HashSet<>()); // Use TypeRef> to receive full CloudEvent with metadata - var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME_CLOUDEVENT, new TypeRef>(){}) + var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_CLOUDEVENT, new TypeRef>(){}) .doOnNext(cloudEvent -> { if (cloudEvent.getData() != null && cloudEvent.getData().contains(runId)) { messageIds.add(cloudEvent.getId()); @@ -241,8 +241,8 @@ public void testPubSubRawPayload() throws Exception { Set messages = Collections.synchronizedSet(new HashSet<>()); Map metadata = Map.of("rawPayload", "true"); - // Use subscribeToEvents with rawPayload metadata - var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME_RAWPAYLOAD, TypeRef.STRING, metadata) + // Use subscribeToTopic with rawPayload metadata + var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_RAWPAYLOAD, TypeRef.STRING, metadata) .doOnNext(rawMessage -> { if (rawMessage.contains(runId)) { messages.add(rawMessage); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 489db11e60..cf82c5fb73 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -487,16 +487,34 @@ public Subscription subscribeToEvents( /** * {@inheritDoc} */ + @Deprecated @Override public Flux subscribeToEvents(String pubsubName, String topic, TypeRef type) { - return subscribeToEvents(pubsubName, topic, type, null); + return subscribeToTopic(pubsubName, topic, type); } /** * {@inheritDoc} */ + @Deprecated @Override public Flux subscribeToEvents(String pubsubName, String topic, TypeRef type, Map metadata) { + return subscribeToTopic(pubsubName, topic, type, metadata); + } + + /** + * {@inheritDoc} + */ + @Override + public Flux subscribeToTopic(String pubsubName, String topic, TypeRef type) { + return subscribeToTopic(pubsubName, topic, type, null); + } + + /** + * {@inheritDoc} + */ + @Override + public Flux subscribeToTopic(String pubsubName, String topic, TypeRef type, Map metadata) { DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder = DaprPubsubProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() .setTopic(topic) diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index dd92d936a2..b78c1eccf8 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -272,12 +272,39 @@ Mono> publishEvents(String pubsubName, String topicNa * @param type Type for object deserialization. * @param Type of object deserialization. * @return An active subscription. - * @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach. + * @deprecated Use {@link #subscribeToTopic(String, String, TypeRef)} instead for a more reactive approach. */ @Deprecated Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type); + /** + * Subscribe to pubsub events via streaming using Project Reactor Flux. + * + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param type Type for object deserialization. + * @param Type of the event payload. + * @return A Flux of deserialized event payloads. + * @deprecated Use {@link #subscribeToTopic(String, String, TypeRef)} instead. + */ + @Deprecated + Flux subscribeToEvents(String pubsubName, String topic, TypeRef type); + + /** + * Subscribe to pubsub events via streaming using Project Reactor Flux with metadata support. + * + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param type Type for object deserialization. + * @param metadata Subscription metadata (e.g., {"rawPayload": "true"}). + * @param Type of the event payload. + * @return A Flux of deserialized event payloads. + * @deprecated Use {@link #subscribeToTopic(String, String, TypeRef, Map)} instead. + */ + @Deprecated + Flux subscribeToEvents(String pubsubName, String topic, TypeRef type, Map metadata); + /** * Subscribe to pubsub events via streaming using Project Reactor Flux. * @@ -293,12 +320,12 @@ Subscription subscribeToEvents( * @return A Flux of deserialized event payloads. * @param Type of the event payload. */ - Flux subscribeToEvents(String pubsubName, String topic, TypeRef type); + Flux subscribeToTopic(String pubsubName, String topic, TypeRef type); /** * Subscribe to pubsub events via streaming using Project Reactor Flux with metadata support. * - *

If metadata is null or empty, this method delegates to {@link #subscribeToEvents(String, String, TypeRef)}. + *

If metadata is null or empty, this method delegates to {@link #subscribeToTopic(String, String, TypeRef)}. * Use metadata {@code {"rawPayload": "true"}} for raw payload subscriptions where Dapr * delivers messages without CloudEvent wrapping. * @@ -309,7 +336,7 @@ Subscription subscribeToEvents( * @return A Flux of deserialized event payloads. * @param Type of the event payload. */ - Flux subscribeToEvents(String pubsubName, String topic, TypeRef type, Map metadata); + Flux subscribeToTopic(String pubsubName, String topic, TypeRef type, Map metadata); /* * Converse with an LLM. diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index bd734692da..4c4955cf1f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -640,8 +640,8 @@ public void onCompleted() { final AtomicInteger eventCount = new AtomicInteger(0); final Semaphore gotAll = new Semaphore(0); - // subscribeToEvents now returns Flux directly (raw data) - var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING) + // subscribeToTopic returns Flux directly (raw data) + var disposable = previewClient.subscribeToTopic(pubsubName, topicName, TypeRef.STRING) .doOnNext(rawData -> { // rawData is String directly, not CloudEvent assertEquals(data, rawData); @@ -728,7 +728,180 @@ public void onCompleted() { final Semaphore gotAll = new Semaphore(0); Map metadata = Map.of("rawPayload", "true"); - // Use subscribeToEvents with rawPayload metadata + // Use subscribeToTopic with rawPayload metadata + var disposable = previewClient.subscribeToTopic(pubsubName, topicName, TypeRef.STRING, metadata) + .doOnNext(rawData -> { + assertEquals(data, rawData); + assertTrue(rawData instanceof String); + + int count = eventCount.incrementAndGet(); + + if (count >= numEvents) { + gotAll.release(); + } + }) + .subscribe(); + + gotAll.acquire(); + disposable.dispose(); + + assertEquals(numEvents, eventCount.get()); + + // Verify metadata was passed to gRPC request + assertNotNull(capturedMetadata.get()); + assertEquals("true", capturedMetadata.get().get("rawPayload")); + } + + @SuppressWarnings("deprecation") + @Test + public void deprecatedSubscribeToEventsFluxTest() throws Exception { + var numEvents = 10; + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var data = "my message"; + var started = new Semaphore(0); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + + var emitterThread = new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + + for (int i = 0; i < numEvents; i++) { + DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1 response = + DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(Integer.toString(i)) + .setPubsubName(pubsubName) + .setTopic(topicName) + .setData(ByteString.copyFromUtf8("\"" + data + "\"")) + .setDataContentType("application/json") + .build()) + .build(); + observer.onNext(response); + } + + observer.onCompleted(); + }); + + emitterThread.start(); + + return new StreamObserver<>() { + @Override + public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { + started.release(); + } + + @Override + public void onError(Throwable throwable) { + // No-op + } + + @Override + public void onCompleted() { + // No-op + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + final AtomicInteger eventCount = new AtomicInteger(0); + final Semaphore gotAll = new Semaphore(0); + + // Test deprecated subscribeToEvents method (delegates to subscribeToTopic) + var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING) + .doOnNext(rawData -> { + assertEquals(data, rawData); + assertTrue(rawData instanceof String); + + int count = eventCount.incrementAndGet(); + + if (count >= numEvents) { + gotAll.release(); + } + }) + .subscribe(); + + gotAll.acquire(); + disposable.dispose(); + + assertEquals(numEvents, eventCount.get()); + } + + @SuppressWarnings("deprecation") + @Test + public void deprecatedSubscribeToEventsWithMetadataTest() throws Exception { + var numEvents = 10; + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var data = "my message"; + var started = new Semaphore(0); + var capturedMetadata = new AtomicReference>(); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + + var emitterThread = new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + observer.onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + + for (int i = 0; i < numEvents; i++) { + DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1 response = + DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(Integer.toString(i)) + .setPubsubName(pubsubName) + .setTopic(topicName) + .setData(ByteString.copyFromUtf8("\"" + data + "\"")) + .setDataContentType("application/json") + .build()) + .build(); + observer.onNext(response); + } + + observer.onCompleted(); + }); + + emitterThread.start(); + + return new StreamObserver<>() { + @Override + public void onNext(DaprPubsubProtos.SubscribeTopicEventsRequestAlpha1 request) { + if (request.hasInitialRequest()) { + capturedMetadata.set(request.getInitialRequest().getMetadataMap()); + } + started.release(); + } + + @Override + public void onError(Throwable throwable) { + // No-op + } + + @Override + public void onCompleted() { + // No-op + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + final AtomicInteger eventCount = new AtomicInteger(0); + final Semaphore gotAll = new Semaphore(0); + Map metadata = Map.of("rawPayload", "true"); + + // Test deprecated subscribeToEvents method with metadata (delegates to subscribeToTopic) var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING, metadata) .doOnNext(rawData -> { assertEquals(data, rawData);