介绍

开发者手册

在conf/server.yaml配置文件中有下面这一段配置,就是关于集群部署的

mode:
#  type: standalone
  type: Cluster
  repository:
    type: ZooKeeper
    props:
      namespace: governance_ds
      server-lists: localhost:2181
      retryIntervalMilliseconds: 500
      timeToLiveSeconds: 60
      maxRetries: 3
      operationTimeoutMilliseconds: 500

ShardingSphere支持两种运行模式,Standalone独立模式和Cluster集群模式。

在Standalone独立模式下,ShardingSphere不需要考虑其他实例的影响,直接在内存中管理核心配置规则就可以了。如果把整个mode都注释掉,他就是ShardingSphere默认的运行模式。

而在Cluster集群模式下,ShardingSphere不光要考虑自己的配置规则,还需要考虑如何跟集群中的其他实例同步自己的配置规则。这就需要引入第三方组件来提供配置信息同步。ShardingSphere目前支持的配置中心包括:Zookeeper、etcd、Nacos、Consule

但是在ShardingSphere分库分表的场景下,这些配置信息几乎不会变动,访问频率也不会太高。所以,最为推荐的,是基于CP架构的Zookeeper。

另外,如果应用的本地和Zookeeper中都有配置信息,那么ShardingSphere会以Zookeeper中的配置为准。

使用Zookeeper进行集群部署

接下来我们可以基于Zookeeper部署一下ShardingProxy集群,看一下ShardingSphere需要同步的配置有哪些。

我们只需要在本地部署一个Zookeeper,然后将server.yaml中的mode部分解除注释:

mode:
  type: Cluster
  repository:
    type: ZooKeeper
    props:
      namespace: governance_ds
      server-lists: localhost:2181
      retryIntervalMilliseconds: 500
      timeToLiveSeconds: 60
      maxRetries: 3
      operationTimeoutMilliseconds: 500

启动ShardingProxy服务后,可以看到Zookeeper注册中心的信息如下是:

namespace
├──rules # 全局规则配置
├──props # 属性配置
├──metadata # Metadata 配置
├ ├──${databaseName} # 逻辑数据库名称
├ ├ ├──schemas # Schema 列表
├ ├ ├ ├──${schemaName} # 逻辑 Schema 名称
├ ├ ├ ├ ├──tables # 表结构配置
├ ├ ├ ├ ├ ├──${tableName}
├ ├ ├ ├ ├ ├──...
├ ├ ├ ├──...
├ ├ ├──versions # 元数据版本列表
├ ├ ├ ├ ├──views # 视图结构配置
├ ├ ├ ├ ├ ├──${viewName}
├ ├ ├ ├ ├ ├──...
├ ├ ├ ├──${versionNumber} # 元数据版本号
├ ├ ├ ├ ├──dataSources # 数据源配置
├ ├ ├ ├ ├──rules # 规则配置
├ ├ ├ ├──...
├ ├ ├──active_version # 激活的元数据版本号
├ ├──...
├──nodes
├ ├──compute_nodes
├ ├ ├──online
├ ├ ├ ├──proxy
├ ├ ├ ├ ├──UUID # Proxy 实例唯一标识
├ ├ ├ ├ ├──....
├ ├ ├ ├──jdbc
├ ├ ├ ├ ├──UUID # JDBC 实例唯一标识
├ ├ ├ ├ ├──....
├ ├ ├──status
├ ├ ├ ├──UUID
├ ├ ├ ├──....
├ ├ ├──worker_id
├ ├ ├ ├──UUID
├ ├ ├ ├──....
├ ├ ├──process_trigger
├ ├ ├ ├──process_list_id:UUID
├ ├ ├ ├──....
├ ├ ├──labels
├ ├ ├ ├──UUID
├ ├ ├ ├──....
├ ├──storage_nodes
├ ├ ├──${databaseName.groupName.ds}
├ ├ ├──${databaseName.groupName.ds}

而在rules部分,就是我们配置的ShardingProxy的核心属性

- !AUTHORITY
  provider:
    type: ALL_PERMITTED
  users:
  - root@%:root
  - sharding@%:sharding
- !TRANSACTION
  defaultType: XA
  providerType: Atomikos
- !SQL_PARSER
  parseTreeCache:
    initialCapacity: 128
    maximumSize: 1024
  sqlCommentParseEnabled: true
  sqlStatementCache:
    initialCapacity: 2000
    maximumSize: 65535

而分库分表的信息,则配置在/governance_ds/metadata/sharding_db/versions/0/rules节点下

- !SHARDING
  tables:
    # 逻辑表
    sys_user:
      actualDataNodes: ds_${0..1}.sys_user${1..2}
      # 分表策略
      tableStrategy:
        standard:
          shardingColumn: uid
          shardingAlgorithmName: sys_user_tab_alg
      # 分布式主键生成策略
      keyGenerateStrategy:
        column: uid
        keyGeneratorName: alg_snowflake


  # 默认分库策略
  defaultDatabaseStrategy:
    standard:
      shardingColumn: uid
      shardingAlgorithmName: database_inline
  # 默认分表策略
  defaultTableStrategy:
    none:

  # 分片策略
  shardingAlgorithms:
    database_inline:
      type: INLINE
      props:
        algorithm-expression: ds_${uid % 2}
    sys_user_tab_alg:
      type: INLINE
      props:
        algorithm-expression: sys_user$->{((uid+1)%4).intdiv(2)+1}
  
  # 分布式主键生成策略
  keyGenerators:
    alg_snowflake:
      type: COSID_SNOWFLAKE

统一ShardingJDBC和ShardingProxy配置

既然ShardingProxy可以通过Zookeeper同步配置信息,那么我们可不可以在ShardingJDBC中也采用Zookeeper的配置呢?当然是可以的。

通过Zookeeper注册中心同步配置

第一种简单的思路就是将ShardingProxy中的mod部分配置移植到之前的ShardingJDBC示例中。

将application.properties中的配置信息全部删除,只配置Zookeeper地址:

在这里插入图片描述

# 微服务中配置信息如下,如果使用yml配置文件方式就不需要spring.shardingsphere这个前缀,当然使用yml加上这个前缀也能正常运行,只是需要了解这一点
# 如果使用properties就需要再上方配置的基础上加上spring.shardingsphere前缀
spring.shardingsphere.mode.type=Cluster
spring.shardingsphere.mode.repository.type=ZooKeeper
spring.shardingsphere.mode.repository.props.namespace=governance_ds
spring.shardingsphere.mode.repository.props.server-lists=localhost:2181
spring.shardingsphere.mode.repository.props.retryIntervalMilliseconds=600
spring.shardingsphere.mode.repository.props.timeToLiveSecoonds=60
spring.shardingsphere.mode.repository.props.maxRetries=3
spring.shardingsphere.mode.repository.props.operationTimeoutMilliseconds=500

# 指定读取Zookeeper上的哪一个库。默认值是logic_db
# ShardingProxy的配置文件中默认配置的库是sharding_db
# 如果有多个数据库需要读取,用逗号隔开 spring.shardingsphere.database.name=sharding_hs_db,logic_db,sharding_db
spring.shardingsphere.database.name=sharding_hs_db

这里需要注意,如果是使用ShardingJDBC的方式,那么默认是会读取一个logic_db数据库,而ShardingProxy的配置中,默认的数据库名是sharding_db,就会造成微服务端在进行查询更新操作时没有想过的分片策略使用

在这里插入图片描述

org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory#create()

public static MetaDataContexts create(...) throws SQLException {
    // instanceContext.getInstance().getMetaData() 这个方法判断是不是JDBCInstanceMetaData
    // 我们使用ShardingJDBC就是会创建JDBCInstanceMetaData类型
        Collection<String> databaseNames = instanceContext.getInstance().getMetaData() instanceof JDBCInstanceMetaData
            // 是JDBC类型就相当于读取本地的配置 ,当前我们配置文件中只有zookeeper连接的配置信息,所以最终读取到一个默认值logic_db   
            ? parameter.getDatabaseConfigs().keySet()     
             // 而ShardingProxy就相当于是去读取Zookeeper中metadata下的所有数据库   
            : persistService.getDatabaseMetaDataService().loadAllDatabaseNames();
    ......
}

上方中instanceContext.getInstance().getMetaData()值的来源是ShardingSphereDataSource类中

// org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource类
// 构造方法会调用下面这个方法
private ContextManager createContextManager(...) throws SQLException {
    //  创建JDBCInstanceMetaData
    InstanceMetaData instanceMetaData = InstanceMetaDataBuilderFactory.create("JDBC", -1);
    ......
}

// 进入到create()方法 下面使用SPI机制加载InstanceMetaDataBuilder接口的实现类
// 而InstanceMetaDataBuilder接口的实现类就只有 JDBCInstanceMetaDataBuilder 和  ProxyInstanceMetaDataBuilder
// 对应的就是SharingJDBC和ShardingProxy两种方式
public static InstanceMetaData create(String type, int port) {
       return ((InstanceMetaDataBuilder)TypedSPIRegistry.getRegisteredService(InstanceMetaDataBuilder.class, type)).build(port);
}

上方中parameter.getDatabaseConfigs().keySet()会读取到一个默认值logic_db 对应的源码在DatabaseNameSetter类中

package org.apache.shardingsphere.spring.boot.schema;

public final class DatabaseNameSetter {
    
    private static final String DATABASE_NAME_KEY = "spring.shardingsphere.database.name";
    
    private static final String SCHEMA_NAME_KEY = "spring.shardingsphere.schema.name";
    
    /**
     * Get database name.
     *
     * @param environment spring boot environment
     * @return schema name
     */
    public static String getDatabaseName(final Environment environment) {
        StandardEnvironment standardEnv = (StandardEnvironment) environment;
        // 先读取spring.shardingsphere.database.name配置项的值
        String databaseName = standardEnv.getProperty(DATABASE_NAME_KEY);
        if (!Strings.isNullOrEmpty(databaseName)) {
            return databaseName;
        }
        // 再去读取 spring.shardingsphere.schema.name 配置项的值
        String schemaName = standardEnv.getProperty(SCHEMA_NAME_KEY);
        //  DefaultDatabase.LOGIC_NAME 默认值是 logic_db
        return Strings.isNullOrEmpty(schemaName) ? DefaultDatabase.LOGIC_NAME : schemaName;
    }
}

直接使用ShardingProxy提供的JDBC驱动读取配置文件

ShardingSphere还提供了自己的JDBC驱动

在我们的微服务中 classpath下增加一个config.yaml,然后将我们之前在ShardingProxy中的几个关键配置整合到一起

databaseName: sharding_hs_db

dataSources:
  ds_0:
    url: jdbc:mysql://localhost:3306/sharding_sphere1?serverTimezone=UTC&useSSL=false
    username: root
    password: 1234
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1
  ds_1:
    url: jdbc:mysql://localhost:3306/sharding_sphere2?serverTimezone=UTC&useSSL=false
    username: root
    password: 1234
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
    minPoolSize: 1

rules:
- !SHARDING
  tables:
    # 逻辑表
    sys_user:
      actualDataNodes: ds_${0..1}.sys_user${1..2}
      # 分表策略
      tableStrategy:
        standard:
          shardingColumn: uid
          shardingAlgorithmName: sys_user_tab_alg
      # 分布式主键生成策略
      keyGenerateStrategy:
        column: uid
        keyGeneratorName: alg_snowflake


  # 默认分库策略
  defaultDatabaseStrategy:
    standard:
      shardingColumn: uid
      shardingAlgorithmName: database_inline
  # 默认分表策略
  defaultTableStrategy:
    none:

  # 分片策略
  shardingAlgorithms:
    database_inline:
      type: INLINE
      props:
        algorithm-expression: ds_${uid % 2}
    sys_user_tab_alg:
      type: INLINE
      props:
        algorithm-expression: sys_user$->{((uid+1)%4).intdiv(2)+1}
  
  # 分布式主键生成策略
  keyGenerators:
    alg_snowflake:
      type: COSID_SNOWFLAKE

# 注意,下方rules需要注释掉
rules:
  - !AUTHORITY
    users:
      - root@%:root
      - sharding@:sharding
    provider:
      type: ALL_PERMITTED
  - !TRANSACTION
    defaultType: XA
    providerType: Atomikos
  - !SQL_PARSER
    sqlCommentParseEnabled: true
    sqlStatementCache:
      initialCapacity: 2000
      maximumSize: 65535
    parseTreeCache:
      initialCapacity: 128
      maximumSize: 1024
      
props:
  max-connections-size-per-query: 1
  kernel-executor-size: 16  # Infinite by default.
  proxy-frontend-flush-threshold: 128  # The default value is 128.
  proxy-hint-enabled: false
  sql-show: false
  check-table-metadata-enabled: false
    # Proxy backend query fetch size. A larger value may increase the memory usage of ShardingSphere Proxy.
    # The default value is -1, which means set the minimum value for different JDBC drivers.
  proxy-backend-query-fetch-size: -1
  proxy-frontend-executor-size: 0 # Proxy frontend executor size. The default value is 0, which means let Netty decide.
    # Available options of proxy backend executor suitable: OLAP(default), OLTP. The OLTP option may reduce time cost of writing packets to client, but it may increase the latency of SQL execution
    # and block other clients if client connections are more than `proxy-frontend-executor-size`, especially executing slow SQL.
  proxy-backend-executor-suitable: OLAP
  proxy-frontend-max-connections: 0 # Less than or equal to 0 means no limitation.
    # Available sql federation type: NONE (default), ORIGINAL, ADVANCED
  sql-federation-type: NONE
    # Available proxy backend driver type: JDBC (default), ExperimentalVertx
  proxy-backend-driver-type: JDBC
  proxy-mysql-default-version: 8.0.15 # In the absence of schema name, the default version will be used.
  proxy-default-port: 3307 # Proxy default port.
  proxy-netty-backlog: 1024 # Proxy netty backlog.

然后,可以直接用JDBC的方式访问带有分库分表的虚拟库。

public class ShardingJDBCDriverTest {

    @Test
    public void test() throws ClassNotFoundException, SQLException {
        String jdbcDriver = "org.apache.shardingsphere.driver.ShardingSphereDriver";
        String jdbcUrl = "jdbc:shardingsphere:classpath:config.yaml";
        String sql = "select * from sharding_hs_db.sys_user";

        Class.forName(jdbcDriver);
        
        try(Connection connection = DriverManager.getConnection(jdbcUrl);) {
            Statement statement = connection.createStatement();
            ResultSet resultSet = statement.executeQuery(sql);
            while (resultSet.next()){
                System.out.println("uid= "+resultSet.getLong("uid"));
            }
        }
    }
}

启动时,发现报错了

Caused by: java.lang.IllegalStateException: dataSourceClassName can not be null.

在这里插入图片描述

看样子直接把配置从ShardingProxy中复制过来有点小问题,那边是不需要加的。

所以需要再现有数据源配置上加上dataSourceClassName的配置,我这里先是使用com.mysql.cj.jdbc.Driver

在这里插入图片描述

结果报错了

java.lang.ClassCastException: com.mysql.cj.jdbc.Driver cannot be cast to javax.sql.DataSource

在这里插入图片描述

再通过查看开发手册,修改成了com.zaxxer.hikari.HikariDataSource

在这里插入图片描述

此时又报了新的错误

java.lang.NullPointerException: Can not find transaction manager of `XA`

在这里插入图片描述

接下来再解决XA事务管理器相关的问题,因为ShardingProxy默认 XA事务管理器 使用的是 Atomikos ,我们上方config.yaml配置文件中也是这个配置。所以我接下里导入相关依赖

<!--XA 分布式事务 -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>shardingsphere-transaction-xa-core</artifactId>
    <!-- 因为文本使用的是ShardingSphere 5.2.1这里和总版本对应上 -->
    <version>5.2.1</version>
</dependency>

又报错了

Failed to instantiate [javax.sql.DataSource]: Factory method 'shardingSphereDataSource' threw exception; nested exception is java.lang.AbstractMethodError: com.atomikos.icatch.jta.JtaTransactionServicePlugin.beforeInit()V

在这里插入图片描述

从报错信息可以看出来是Atomikos源码包中有问题,接下来在进行解决

最终导入的依赖如下

<!--XA 分布式事务 -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>shardingsphere-transaction-xa-core</artifactId>
    <version>5.2.1</version>
    <exclusions>
        <exclusion>
            <artifactId>transactions-jdbc</artifactId>
            <groupId>com.atomikos</groupId>
        </exclusion>
        <exclusion>
            <artifactId>transactions-jta</artifactId>
            <groupId>com.atomikos</groupId>
        </exclusion>
    </exclusions>
</dependency>
<!-- 版本滞后了 -->
<dependency>
    <artifactId>transactions-jdbc</artifactId>
    <groupId>com.atomikos</groupId>
    <version>5.0.8</version>
</dependency>
<dependency>
    <artifactId>transactions-jta</artifactId>
    <groupId>com.atomikos</groupId>
    <version>5.0.8</version>
</dependency>

终于执行通过

在这里插入图片描述

下方这种测试方法也可以

public class ShardingSphereDatasourceTest {

    public static void main(String[] args) throws SQLException, ClassNotFoundException {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setDriverClassName("org.apache.shardingsphere.driver.ShardingSphereDriver");
        dataSource.setJdbcUrl("jdbc:shardingsphere:classpath:config.yaml");

//        Class.forName("org.apache.shardingsphere.driver.ShardingSphereDriver");
//        String jdbcUrl = "jdbc:shardingsphere:classpath:config.yaml";
//        Connection conn = DriverManager.getConnection(jdbcUrl);

        Connection conn = dataSource.getConnection();

        String sql = "SELECT cid,cname,user_id,cstatus from course where cid=851198093910081536";
        try {
            //ShardingConnectioin
            conn = dataSource.getConnection();
            //ShardingStatement
            Statement statement = conn.createStatement();
            //ShardingResultSet
            ResultSet result = statement.executeQuery(sql);
            while (result.next()) {
                System.out.println("result:" + result);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if (null != conn) {
                conn.close();
            }
        }
    }
}
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐