⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,4 @@ jobs:
uses: codecov/codecov-action@v3
with:
file: ${{ steps.jacoco_report_path.outputs.path }}
flags: java
flags: java
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,6 @@ class BeamModulePlugin implements Plugin<Project> {
def google_ads_version = "33.0.0"
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "2.2.26"
// [bomupgrader] determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom
def google_cloud_spanner_version = "6.104.0"
def google_code_gson_version = "2.10.1"
def google_oauth_clients_version = "1.34.1"
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
Expand Down Expand Up @@ -762,10 +760,7 @@ class BeamModulePlugin implements Plugin<Project> {
// libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.75.0",
google_cloud_secret_manager : "com.google.cloud:google-cloud-secretmanager", // google_cloud_platform_libraries_bom sets version
// TODO(#35868) remove pinned google_cloud_spanner_bom after tests or upstream fixed
google_cloud_spanner_bom : "com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_cloud_tink : "com.google.crypto.tink:tink:1.19.0",
google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
Expand Down
14 changes: 2 additions & 12 deletions sdks/java/bom/gcp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,7 @@ apply from: '../common.gradle'

dependencies {
api platform(project(":sdks:java:bom"))
api platform(project.library.java.google_cloud_spanner_bom)
api platform(project.library.java.google_cloud_platform_libraries_bom) {
// TODO(https://github.com/apache/beam/issues/37328) remove exclude and google_cloud_spanner_bom after upstream and/or tests fixed
exclude group: "com.google.cloud", module: "google-cloud-spanner"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-v1"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-instance-v1"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-database-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-instance-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-database-v1"
}
api platform(project.library.java.google_cloud_platform_libraries_bom)
constraints {
api project.library.java.guava
}
Expand All @@ -42,4 +32,4 @@ publishing {
artifactId = 'beam-sdks-java-google-cloud-platform-bom'
}
}
}
}
16 changes: 3 additions & 13 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,7 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform"
ext.summary = "IO library to read and write Google Cloud Platform systems from Beam."

dependencies {
implementation(enforcedPlatform(library.java.google_cloud_platform_libraries_bom)) {
// TODO(https://github.com/apache/beam/issues/35868) remove exclude after upstream and/or tests fixed
exclude group: "com.google.cloud", module: "google-cloud-spanner"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-v1"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-instance-v1"
exclude group: "com.google.api.grpc", module: "proto-google-cloud-spanner-admin-database-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-instance-v1"
exclude group: "com.google.api.grpc", module: "grpc-google-cloud-spanner-admin-database-v1"
}
implementation(enforcedPlatform(library.java.google_cloud_spanner_bom))
implementation(enforcedPlatform(library.java.google_cloud_platform_libraries_bom))
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(":runners:core-java")
implementation project(path: ":sdks:java:core", configuration: "shadow")
Expand Down Expand Up @@ -164,7 +154,7 @@ dependencies {
testImplementation library.java.mockito_core
testRuntimeOnly library.java.mockito_inline
testImplementation library.java.joda_time
testImplementation library.java.google_cloud_spanner_test
testImplementation "com.google.cloud:google-cloud-spanner::tests"
testImplementation library.java.google_cloud_bigtable_emulator
testRuntimeOnly library.java.slf4j_jdk14
// everit_json is needed for Pubsub SchemaTransform that relies on JSON-schema translation.
Expand Down Expand Up @@ -358,4 +348,4 @@ task postCommit {
description = "Integration tests of GCP connectors using the DirectRunner."
dependsOn integrationTest
dependsOn integrationTestKms
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
Expand All @@ -38,6 +39,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.options.ValueProvider;
Expand All @@ -61,6 +63,8 @@ public class SpannerAccessor implements AutoCloseable {
*/
private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";

static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION = java.time.Duration.ofMinutes(0);

/** Instance ID to use when connecting to an experimental host. */
public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default";

Expand Down Expand Up @@ -113,6 +117,11 @@ public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) {
static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
SpannerOptions.Builder builder = SpannerOptions.newBuilder();

// TODO(https://github.com/apache/beam/issues/37451) Disable gRPC gcp extension which was
// causing the application thread to stall.
// Remove this once Spanner fixes the hanging issue
builder.disableGrpcGcpExtension();

Set<Code> retryableCodes = new HashSet<>();
if (spannerConfig.getRetryableCodes() != null) {
retryableCodes.addAll(spannerConfig.getRetryableCodes());
Expand Down Expand Up @@ -265,6 +274,15 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
builder.setCredentials(credentials.get());
}

// ValueProvider<java.time.Duration> waitForSessionCreationDuration =
// spannerConfig.getWaitForSessionCreationDuration();
// java.time.Duration waitDuration =
// Optional.ofNullable(waitForSessionCreationDuration)
// .map(ValueProvider::get)
// .orElse(DEFAULT_SESSION_WAIT_DURATION);
// builder.setSessionPoolOption(
// SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build());

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public String getHostValue() {

public abstract @Nullable ValueProvider<Credentials> getCredentials();

public abstract @Nullable ValueProvider<java.time.Duration> getWaitForSessionCreationDuration();

abstract Builder toBuilder();

public static SpannerConfig create() {
Expand Down Expand Up @@ -189,6 +191,9 @@ abstract Builder setExecuteStreamingSqlRetrySettings(

abstract Builder setPlainText(ValueProvider<Boolean> plainText);

abstract Builder setWaitForSessionCreationDuration(
ValueProvider<java.time.Duration> waitForSessionCreationDuration);

public abstract SpannerConfig build();
}

Expand Down Expand Up @@ -389,4 +394,24 @@ public SpannerConfig withUsingPlainTextChannel(ValueProvider<Boolean> plainText)
public SpannerConfig withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* Sets the wait time for a multiplexed session to be available when creating a database client.
*
* <p>Setting this will block the {@link com.google.cloud.spanner.DatabaseClient} creation.
*
* @param waitForSessionCreationDuration The duration to wait. Defaults to {@link
* SpannerAccessor#DEFAULT_SESSION_WAIT_DURATION}.
* @return {@link SpannerConfig}
*/
public SpannerConfig withWaitForSessionCreationDuration(
ValueProvider<java.time.Duration> waitForSessionCreationDuration) {
return toBuilder().setWaitForSessionCreationDuration(waitForSessionCreationDuration).build();
}

public SpannerConfig withWaitForSessionCreationDuration(
java.time.Duration waitForSessionCreationDuration) {
return withWaitForSessionCreationDuration(
ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration));
}
}
Loading