基于Netty实现,位于package org.apache.flink.runtime.rest
启动服务 入口类
StandaloneSessionClusterEntrypoint 资源分发工厂类DefaultDispatcherResourceManagerComponentFactory
WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), “DispatcherRestEndpoint”); WebMonitorEndpoint 构造方法
public WebMonitorEndpoint(GatewayRetriever leaderRetriever,Configuration clusterConfiguration,RestHandlerConfiguration restConfiguration,GatewayRetriever resourceManagerRetriever,TransientBlobService transientBlobService,ScheduledExecutorService executor,MetricFetcher metricFetcher,LeaderElectionService leaderElectionService,ExecutionGraphCache executionGraphCache,FatalErrorHandler fatalErrorHandler)throws IOException, ConfigurationException {super(clusterConfiguration);this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);this.restConfiguration = Preconditions.checkNotNull(restConfiguration);this.resourceManagerRetriever = Preconditions.checkNotNull(resourceManagerRetriever);this.transientBlobService = Preconditions.checkNotNull(transientBlobService);this.executor = Preconditions.checkNotNull(executor);this.executionGraphCache = executionGraphCache;this.checkpointStatsCache =new CheckpointStatsCache(restConfiguration.getMaxCheckpointStatisticCacheEntries());this.metricFetcher = metricFetcher;this.leaderElectionService = Preconditions.checkNotNull(leaderElectionService);this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);}
初始化API
protected List<tuple2> initializeHandlers( final CompletableFuture localAddressFuture)
ClusterOverviewHandler DashboardConfigHandler JobIdsHandler 。。。 </tuple2 抽象类RestServerEndpoint 构造方法
public RestServerEndpoint(Configuration configuration)
获取restAddress、restBindAddress、restBindPortRange等初始化参数 注册具体的处理请求
private static void registerHandler( Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) { switch (httpMethod) { case GET: router.addGet(handlerURL, handler); break; case POST: router.addPost(handlerURL, handler); break; case DELETE: router.addDelete(handlerURL, handler); break; case PATCH: router.addPatch(handlerURL, handler); break; default: throw new RuntimeException(“Unsupported http method: ” + httpMethod + ‘.’); } } 初始化管道
if (isHttpsEnabled()) { ch.pipeline() .addLast( “ssl”, new RedirectingSslHandler( restAddress, restAddressFuture, sslHandlerFactory)); }
ch.pipeline().addLast(new HttpServerCodec()).addLast(new FileUploadHandler(uploadDir)).addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders));
开启多事件循环
NioEventLoopGroup bossGroup = new NioEventLoopGroup( 1, new ExecutorThreadFactory(“flink-rest-server-netty-boss”)); NioEventLoopGroup workerGroup = new NioEventLoopGroup( 0, new ExecutorThreadFactory(“flink-rest-server-netty-worker”)); 服务事件绑定
bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); 端口绑定
while (portsIterator.hasNext()) { try { chosenPort = portsIterator.next(); final ChannelFuture channel; if (restBindAddress == null) { channel = bootstrap.bind(chosenPort); } else { channel = bootstrap.bind(restBindAddress, chosenPort); } serverChannel = channel.syncUninterruptibly().channel(); break; } catch (final Exception e) { // syncUninterruptibly() throws checked exceptions via Unsafe // continue if the exception is due to the port being in use, fail early // otherwise if (!(e instanceof java.net.BindException)) { throw e; } } } Web实现类 ArrayList<tuple2> handlers = new ArrayList(30); </tuple2 继承的抽象类AbstractRestHandler
文章转自:Flink Rest服务器端点实现_Java-答学网
作者:答学网,转载请注明原文链接:http://www.dxzl8.com/