博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的RestClientConfiguration
阅读量:5824 次
发布时间:2019-06-18

本文共 7716 字,大约阅读时间需要 25 分钟。

本文主要研究一下flink的RestClientConfiguration

RestClientConfiguration

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java

public final class RestClientConfiguration {	@Nullable	private final SSLHandlerFactory sslHandlerFactory;	private final long connectionTimeout;	private final long idlenessTimeout;	private final int maxContentLength;	private RestClientConfiguration(			@Nullable final SSLHandlerFactory sslHandlerFactory,			final long connectionTimeout,			final long idlenessTimeout,			final int maxContentLength) {		checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);		this.sslHandlerFactory = sslHandlerFactory;		this.connectionTimeout = connectionTimeout;		this.idlenessTimeout = idlenessTimeout;		this.maxContentLength = maxContentLength;	}	/**	 * Returns the {@link SSLEngine} that the REST client endpoint should use.	 *	 * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled	 */	@Nullable	public SSLHandlerFactory getSslHandlerFactory() {		return sslHandlerFactory;	}	/**	 * {@see RestOptions#CONNECTION_TIMEOUT}.	 */	public long getConnectionTimeout() {		return connectionTimeout;	}	/**	 * {@see RestOptions#IDLENESS_TIMEOUT}.	 */	public long getIdlenessTimeout() {		return idlenessTimeout;	}	/**	 * Returns the max content length that the REST client endpoint could handle.	 *	 * @return max content length that the REST client endpoint could handle	 */	public int getMaxContentLength() {		return maxContentLength;	}	/**	 * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.	 *	 * @param config configuration from which the REST client endpoint configuration should be created from	 * @return REST client endpoint configuration	 * @throws ConfigurationException if SSL was configured incorrectly	 */	public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {		Preconditions.checkNotNull(config);		final SSLHandlerFactory sslHandlerFactory;		if (SSLUtils.isRestSSLEnabled(config)) {			try {				sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);			} catch (Exception e) {				throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);			}		} else {			sslHandlerFactory = null;		}		final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);		final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);		int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);		return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);	}}复制代码
  • RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
  • fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
  • connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600

RestClient

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java

public class RestClient implements AutoCloseableAsync {	private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);	private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();	// used to open connections to a rest server endpoint	private final Executor executor;	private final Bootstrap bootstrap;	private final CompletableFuture
terminationFuture; private final AtomicBoolean isRunning = new AtomicBoolean(true); public RestClient(RestClientConfiguration configuration, Executor executor) { Preconditions.checkNotNull(configuration); this.executor = Preconditions.checkNotNull(executor); this.terminationFuture = new CompletableFuture<>(); final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory(); ChannelInitializer
initializer = new ChannelInitializer
() { @Override protected void initChannel(SocketChannel socketChannel) { try { // SSL should be the first handler in the pipeline if (sslHandlerFactory != null) { socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler()); } socketChannel.pipeline() .addLast(new HttpClientCodec()) .addLast(new HttpObjectAggregator(configuration.getMaxContentLength())) .addLast(new ChunkedWriteHandler()) // required for multipart-requests .addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS)) .addLast(new ClientHandler()); } catch (Throwable t) { t.printStackTrace(); ExceptionUtils.rethrow(t); } } }; NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty")); bootstrap = new Bootstrap(); bootstrap .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout())) .group(group) .channel(NioSocketChannel.class) .handler(initializer); LOG.info("Rest client endpoint started."); } @Override public CompletableFuture
closeAsync() { return shutdownInternally(Time.seconds(10L)); } public void shutdown(Time timeout) { final CompletableFuture
shutDownFuture = shutdownInternally(timeout); try { shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); LOG.info("Rest endpoint shutdown complete."); } catch (Exception e) { LOG.warn("Rest endpoint shutdown failed.", e); } } private CompletableFuture
shutdownInternally(Time timeout) { if (isRunning.compareAndSet(true, false)) { LOG.info("Shutting down rest endpoint."); if (bootstrap != null) { if (bootstrap.group() != null) { bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) .addListener(finished -> { if (finished.isSuccess()) { terminationFuture.complete(null); } else { terminationFuture.completeExceptionally(finished.cause()); } }); } } } return terminationFuture; } //......}复制代码
  • RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()

小结

  • RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
  • connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
  • RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()

doc

转载于:https://juejin.im/post/5c80b29df265da2d961839aa

你可能感兴趣的文章
node生成自定义命令(yargs/commander)
查看>>
各种非算法模板
查看>>
如何创建Servlet
查看>>
.NET 设计规范--.NET约定、惯用法与模式-2.框架设计基础
查看>>
win7 64位+Oracle 11g 64位下使用 PL/SQL Developer 的解决办法
查看>>
BZOJ1997:[HNOI2010]PLANAR——题解
查看>>
BZOJ1014:[JSOI2008]火星人prefix——题解
查看>>
使用Unity3D引擎开发赛车游戏
查看>>
HTML5新手入门指南
查看>>
opennebula 开发记录
查看>>
ubuntu 修改hostname
查看>>
sql 内联,左联,右联,全联
查看>>
C++关于字符串的处理
查看>>
6、Web Service-拦截器
查看>>
Flask 源码流程,上下文管理
查看>>
stream classdesc serialVersionUID = -7218828885279815404, local class serialVersionUID = 1.
查看>>
ZAB与Paxos算法的联系与区别
查看>>
Breaking parallel loops in .NET C# using the Stop method z
查看>>
修改故障转移群集心跳时间
查看>>
[轉]redis;mongodb;memcache三者的性能比較
查看>>