Presto Connectors

本文总结下 Presto Connectors 的 SPI 实现。在 Presto 中,UDF、EventListener、DataTypes、ParameterTypes、Access Control、Resource Group 以及 Connector
都是通过插件机制实现的。

定义插件

Presto 将支持的所有插件类型封装在顶层接口 Plugin 中:

Presto 将所有支持的插件类型,都统一封装在一个统一的接口中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public interface Plugin{

/**
* Connector
*/
default Iterable<ConnectorFactory> getConnectorFactories() {
return emptyList();
}

default Iterable<BlockEncoding> getBlockEncodings() {
return emptyList();
}

/**
* Data Types
*/
default Iterable<Type> getTypes() {
return emptyList();
}

/**
* Parameter Types
*/
default Iterable<ParametricType> getParametricTypes() {
return emptyList();
}

/**
* UDF
*/
default Set<Class<?>> getFunctions() {
return emptySet();
}

default Iterable<SystemAccessControlFactory> getSystemAccessControlFactories() {
return emptyList();
}

default Iterable<PasswordAuthenticatorFactory> getPasswordAuthenticatorFactories() {
return emptyList();
}

/**
* EventListener
*/
default Iterable<EventListenerFactory> getEventListenerFactories() {
return emptyList();
}

default Iterable<ResourceGroupConfigurationManagerFactory> getResourceGroupConfigurationManagerFactories() {
return emptyList();
}

default Iterable<SessionPropertyConfigurationManagerFactory> getSessionPropertyConfigurationManagerFactories() {
return emptyList();
}

default Iterable<FunctionNamespaceManagerFactory> getFunctionNamespaceManagerFactories() {
return emptyList();
}

default Iterable<TempStorageFactory> getTempStorageFactories() {
return emptyList();
}

default Iterable<QueryPrerequisitesFactory> getQueryPrerequisitesFactories() {
return emptyList();
}
}

加载插件

插件加载时序图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
@ThreadSafe
public class PluginManager {

public void loadPlugins() throws Exception {


// installedPluginsDir 通过 config.properties 配置文件中的 plugin.dir 指定
for (File file : listFiles(installedPluginsDir)) {
if (file.isDirectory()) {
loadPlugin(file.getAbsolutePath());
}
}

for (String plugin : plugins) {
loadPlugin(plugin);
}

}

/**
* 逐个加载 plugin,每个插件都要独立的路径,new 一个类加载器
*/
private void loadPlugin(String plugin) throws Exception {

log.info("-- Loading plugin %s --", plugin);
URLClassLoader pluginClassLoader = buildClassLoader(plugin);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
loadPlugin(pluginClassLoader);
}
log.info("-- Finished loading plugin %s --", plugin);
}


/**
* 使用 SPI 机制加载实现了 Plugin 接口的类
*/
private void loadPlugin(URLClassLoader pluginClassLoader) {
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);

if (plugins.isEmpty()) {
log.warn("No service providers of type %s", Plugin.class.getName());
}

for (Plugin plugin : plugins) {
log.info("Installing %s", plugin.getClass().getName());
installPlugin(plugin);
}
}


/**
* 装置 plugin ,将 plugin 单独实现的插件们,设置到 xxxManager 中
* 如 Connector 插件存放到 ConnectorManager 中,xxxManager 中都会有一个内存 Map,称之为 factories
*/
public void installPlugin(Plugin plugin) {

for (ConnectorFactory connectorFactory : plugin.getConnectorFactories()) {

log.info("Registering connector %s", connectorFactory.getName());
connectorManager.addConnectorFactory(connectorFactory);
}

for (QueryPrerequisitesFactory queryPrerequisitesFactory : plugin.getQueryPrerequisitesFactories()) {
log.info("Registering query prerequisite factory %s", queryPrerequisitesFactory.getName());
queryPrerequisitesManager.addQueryPrerequisitesFactory(queryPrerequisitesFactory);
}

...
}

/**
* 为 plugin 构建 URLClassLoader
*/
private URLClassLoader buildClassLoader(String plugin)
throws Exception {
File file = new File(plugin);
if (file.isFile() && (file.getName().equals("pom.xml") || file.getName().endsWith(".pom"))) {
return buildClassLoaderFromPom(file);
}
if (file.isDirectory()) {
return buildClassLoaderFromDirectory(file);
}
return buildClassLoaderFromCoordinates(plugin);
}

/**
* Presto 没有走配置 META-INF/services 来实现 SPI ,而是实现了一套插件自动发现功能,自动创建了对应的目录和文件
*/
private URLClassLoader buildClassLoaderFromPom(File pomFile)
throws Exception {
List<Artifact> artifacts = resolver.resolvePom(pomFile);
URLClassLoader classLoader = createClassLoader(artifacts, pomFile.getPath());

Artifact artifact = artifacts.get(0);

// 插件声明的发现
Set<String> plugins = discoverPlugins(artifact, classLoader);
if (!plugins.isEmpty()) {

// 插件声明的写入
writePluginServices(plugins, artifact.getFile());
}

return classLoader;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final class PluginDiscovery {

private static final String CLASS_FILE_SUFFIX = ".class";
private static final String SERVICES_FILE = "META-INF/services/" + Plugin.class.getName();

/**
* 插件声明的发现
*/
public static Set<String> discoverPlugins(Artifact artifact, ClassLoader classLoader) throws IOException {
// ...

if (new File(file, SERVICES_FILE).exists()) {
return ImmutableSet.of();
}

return listClasses(file.toPath()).stream()
.filter(name -> classInterfaces(name, classLoader).contains(Plugin.class.getName()))
.collect(toImmutableSet());
}

/**
* 插件声明的写入
*/
public static void writePluginServices(Iterable<String> plugins, File root) throws IOException {

Path path = root.toPath().resolve(SERVICES_FILE);

createDirectories(path.getParent());

try (Writer out = Files.newBufferedWriter(path, UTF_8)) {
for (String plugin : plugins) {
out.write(plugin + "\n");
}
}
}
}

加载 Catalogs

Catalog -> Schema -> Table ,ConnectorFactory 可能存放多个 catalog 实例,例如每一个 mysql 实例都是一个 catalog 。

加载 Catalogs 时序图:

  1. PrestoServer.run() 调用 StaticCatalogStore.loadCatalogs()

  1. StaticCatalogStore.loadCatalogs() 的具体调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class StaticCatalogStore {

/**
* 加载 etc/catalog 目录下的所有 properties 文件
*/
public void loadCatalogs(Map<String, Map<String, String>> additionalCatalogs) throws Exception {

for (File file : listFiles(catalogConfigurationDir)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
loadCatalog(file);
}
}
}


/**
* 加载某一个具体的 catalog
*/
private void loadCatalog(String catalogName, Map<String, String> properties) {

log.info("-- Loading catalog %s --", catalogName);

String connectorName = null;
ImmutableMap.Builder<String, String> connectorProperties = ImmutableMap.builder();
for (Entry<String, String> entry : properties.entrySet()) {

// 读取 catalog.properties 中配置的 connector.name
if (entry.getKey().equals("connector.name")) {
connectorName = entry.getValue();
}
else {
connectorProperties.put(entry.getKey(), entry.getValue());
}
}

// 为 catalog 创建真正的物理连接
connectorManager.createConnection(catalogName, connectorName, connectorProperties.build());
log.info("-- Added catalog %s using connector %s --", catalogName, connectorName);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@ThreadSafe
public class ConnectorManager {

/**
* 创建一个 catalog connector ,根据 connectorName 和 catalogName 两个入参,结果输出 catalog 对应的 connectorId
*/
private synchronized ConnectorId createConnection(String catalogName, ConnectorFactory connectorFactory, Map<String, String> properties) {

ConnectorId connectorId = new ConnectorId(catalogName);
checkState(!connectors.containsKey(connectorId), "A connector %s already exists", connectorId);

addCatalogConnector(catalogName, connectorId, connectorFactory, properties);

return connectorId;
}

}

IcebergConnectorFactory

下面以 iceberg 为例,看下如何实现一个 ConnectorFactory 。

实现一个 ConnectorFactory ,需要实现 getName()getHandleResolver()create() 3个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class IcebergConnectorFactory implements ConnectorFactory {

@Override
public String getName() {
return "iceberg";
}

/**
* 返回各自实现的 ConnectorHandleResolver
*/
@Override
public ConnectorHandleResolver getHandleResolver() {
return new IcebergHandleResolver();
}

/**
* 用于创建 Connector
*/
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context) {

ClassLoader classLoader = IcebergConnectorFactory.class.getClassLoader();

try {

// 通过 java 反射,调用 InternalIcebergConnectorFactory.createConnector 创建 iceberg connector
return (Connector) classLoader.loadClass(InternalIcebergConnectorFactory.class.getName())
.getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class)
.invoke(null, catalogName, config, context, Optional.empty());

} catch (InvocationTargetException e) {
Throwable targetException = e.getTargetException();
throwIfUnchecked(targetException);
throw new RuntimeException(targetException);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* 大体是定义关于读写 iceberg 数据源的操作,后续再细拆
*/
public final class InternalIcebergConnectorFactory {

private InternalIcebergConnectorFactory() {
}

public static Connector createConnector(String catalogName, Map<String, String> config, ConnectorContext context, Optional<ExtendedHiveMetastore> metastore){

ClassLoader classLoader = InternalIcebergConnectorFactory.class.getClassLoader();

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {

Bootstrap app = new Bootstrap(
new EventModule(),
new MBeanModule(),
new JsonModule(),
new IcebergModule(),
new IcebergMetastoreModule(),
new HiveS3Module(catalogName),
new HiveAuthenticationModule(),
new HiveMetastoreModule(catalogName, metastore),
binder -> {
binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
});

/*
* Google Guice 是从 Google 开源的一款轻量级依赖注入框架;
*
* Guice:整个框架的门面,通过 Guice 获得 Injector 实例;
* Injector:一个依赖的管理上下文
* Binder:一个接口和实现的绑定
* Module:一组 Binder ,绑定一组被依赖的中间服务
* Provider:bean 的提供者
* Scope:Provider 的作用域
* @Inject:类似 Spring Autowired
* @Name:和 @Inject 配合使用,类似 Spring Resource
*
*/
Injector injector = app
.doNotInitializeLogging()
.setRequiredConfigurationProperties(config)
.initialize();

LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
IcebergTransactionManager transactionManager = injector.getInstance(IcebergTransactionManager.class);

// 获取数据表元数据
IcebergMetadataFactory metadataFactory = injector.getInstance(IcebergMetadataFactory.class);

// 处理分任务逻辑
ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);

// 读取数据
ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);

// 写入数据
ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);

ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
IcebergSessionProperties icebergSessionProperties = injector.getInstance(IcebergSessionProperties.class);
IcebergTableProperties icebergTableProperties = injector.getInstance(IcebergTableProperties.class);
Set<Procedure> procedures = injector.getInstance((Key<Set<Procedure>>) Key.get(Types.setOf(Procedure.class)));

return new IcebergConnector(
lifeCycleManager,
transactionManager,
metadataFactory,
new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
ImmutableSet.of(),
icebergSessionProperties.getSessionProperties(),
IcebergSchemaProperties.SCHEMA_PROPERTIES,
icebergTableProperties.getTableProperties(),
new AllowAllAccessControl(),
procedures);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* ConnectorHandleResolver 接口用于对数据源 schema 结构等解析处理
*/
public class IcebergHandleResolver implements ConnectorHandleResolver {

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass() {
return IcebergTableHandle.class;
}

@Override
public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
return IcebergTableLayoutHandle.class;
}

@Override
public Class<? extends ColumnHandle> getColumnHandleClass() {
return IcebergColumnHandle.class;
}

@Override
public Class<? extends ConnectorSplit> getSplitClass() {
return IcebergSplit.class;
}

@Override
public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass() {
return IcebergWritableTableHandle.class;
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() {
return IcebergWritableTableHandle.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() {
return HiveTransactionHandle.class;
}
}

参考资料

Presto Connector 实现原理