OTS 批量写入时报 UnsupportedOperationException 异常
🏷️ OTS
使用 SyncClient.batchWriteRow
方法批量写入 OTS 时报了如下错误:
txt
java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.mokasz.zy.server.game.system.job.helper.ScheduleRunnable.run(ScheduleRunnable.java:38)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses.
at com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:262)
at com.alicloud.openservices.tablestore.core.protocol.OtsInternalApi$Condition.getSerializedSize(OtsInternalApi.java:4644)
at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:877)
at com.google.protobuf.CodedOutputStream.computeMessageSize(CodedOutputStream.java:661)
at com.alicloud.openservices.tablestore.core.protocol.OtsInternalApi$RowInBatchWriteRowRequest.getSerializedSize(OtsInternalApi.java:30734)
at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:877)
at com.google.protobuf.CodedOutputStream.computeMessageSize(CodedOutputStream.java:661)
at com.alicloud.openservices.tablestore.core.protocol.OtsInternalApi$TableInBatchWriteRowRequest.getSerializedSize(OtsInternalApi.java:31444)
at com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:877)
at com.google.protobuf.CodedOutputStream.computeMessageSize(CodedOutputStream.java:661)
at com.alicloud.openservices.tablestore.core.protocol.OtsInternalApi$BatchWriteRowRequest.getSerializedSize(OtsInternalApi.java:32104)
at com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:69)
at com.alicloud.openservices.tablestore.core.OperationLauncher.asyncInvokePost(OperationLauncher.java:117)
at com.alicloud.openservices.tablestore.core.BatchWriteRowLauncher.fire(BatchWriteRowLauncher.java:64)
at com.alicloud.openservices.tablestore.InternalClient.batchWriteRow(InternalClient.java:609)
at com.alicloud.openservices.tablestore.SyncClient.batchWriteRow(SyncClient.java:282)
... 11 common frames omitted
参考官方文档,说是由于包冲突导致的。查了下果然在另外的包中依赖了 protobuf-java 的 3.13.0 版本,而项目中依赖的 tablestore 5.4.0 中依赖的是 protobuf-java 2.4.1 。
官方文档中也给出了解决方案,在 tablestore 的依赖中添加下面配置中的 classifier 和 exclusions 部分。需要注意的是其中 <classifier>jar-with-dependencies</classifier>
部分不能省略。
xml
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore</artifactId>
<version>替换为您当前使用的版本</version>
<classifier>jar-with-dependencies</classifier>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
</exclusion>
</exclusions>
</dependency>
说明
classifier 为 jar-with-dependencies ,它将依赖的 HttpClient 和 Protobuf 库都通过 rename package 的方式打包进去,去除了对 HttpClient 和 Protobuf 的依赖。
附上 SyncClient
批量写入的部分示例代码:
java
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
@RequiredArgsConstructor
public class OtsServiceImpl implements OtsService {
private final SyncClient syncClient;
@Override
public void saveUserLogs(List<UserLogDO> userLogs) {
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
userLogs.stream().map(l -> l.toRowPutChange())
.forEach(c -> batchWriteRowRequest.addRowChange(c));
BatchWriteRowResponse batchWriteRowResponse = syncClient.batchWriteRow(batchWriteRowRequest);
log.info("saveUserVideoLogs SucceedRows : {} FailedRows: {}",
batchWriteRowResponse.getSucceedRows().size(),
batchWriteRowResponse.getFailedRows().size());
}
}