### What changes were proposed in this pull request?
`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout.
### Why are the changes needed?
There are many `CelebornIOException` that is cause by timeout client creation in production environment as follows:
```
2025-08-22 16:20:10,681 INFO [flink-akka.actor.default-dispatcher-40] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - [vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS $6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC, mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds, hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[lz4sql, rawsize, obcluster, ds, hh, mm, $6, w0$o0]) -> Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm], where=[(w0$o0 = 1)]) (668/1900) (d8bf48183d8c69a1ab84bcd445f6d4ed_0e8289f2bf927649dd2511bbc2bb6759_667_0) switched from RUNNING to FAILED on antc4flink4172792604-taskmanager-403 (dataPort=1).
java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093 timed out (60000 ms)
at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313)
at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250)
at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157)
at org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51)
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200)
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:991)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at java.lang.Thread.run(Thread.java:991) ~[?:?]
```
`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout, which is caused by creating a client using the callback thread of netty.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3450 from SteNicholas/CELEBORN-2129.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>