start-thriftserver.sh启动thrift server,入口是org.apache.spark.sql.hive.thriftserver.HiveThriftServer2。Spark sql的一些背景说明可以参见这里

HiveThriftServer2

  def main(args: Array[String]) {
    logInfo("Starting SparkContext")
    SparkSQLEnv.init()

init中创建了SparkContext和HiveContext,并且在sc中增加了listener,用来记录每个stage结束、的信息,代码如下。

  var hiveContext: HiveContext = _
  var sparkContext: SparkContext = _

  def init() {
    if (hiveContext == null) {
..
      sparkContext = new SparkContext(sparkConf)
      sparkContext.addSparkListener(new StatsReportListener())
      hiveContext = new HiveContext(sparkContext)

继续,创建hivethriftserver2。主要是cliService,1.6版本可以是http或bin模式,用来给beeline提供服务。ThriftHttpCLIService/ThriftBinaryCLIService只是个通道(要么TCP形式,要么http形式的),服务实际是SparkSQLCLIService提供的。server.init里会将2个service都加到服务链表里,server.start时启动。

这里又给sc加了个listener(可以向注册多个listener,会广播):HiveThriftServer2Listener,它也是继承自SparkListener,跟StatsReportListener记录通用信息不同,它会记录每个sql statement的时间、状态等信息。 thrift server启动后会向spark ui注册一个TAB:“JDBC/ODBC Server”。

    try {
      val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
      server.init(SparkSQLEnv.hiveContext.hiveconf)
      server.start()
      logInfo("HiveThriftServer2 started")
      listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf)
      SparkSQLEnv.sparkContext.addSparkListener(listener)
      uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
        Some(new ThriftServerTab(SparkSQLEnv.sparkContext))

对于客户端来说,其实真正提供服务的是SparkSQLCLIService,下面来看看一个beeline是怎么连上来做事的。

SparkSQLSessionManager

SparkSQLCLIService中初始化了backgroundOperationPool(后台op池,后面会用到),并addServer了SparkSQLSessionManager,后者管理sts的会话,例如会在用户登录的时候,新建会话,用户退出关闭会话。SessionManager初始化时,会注册SparkSQLOperationManager,它用来管理:1、会话和hiveContext的关系,根据会话可以找到其hc;2、代替hive的OperationManager管理句柄和operation的关系。 注意Operation只有我们执行SQL的时候才会new一个。

thriftserver可以配置为单会话,即所有beeline共享一个hiveContext,也可以配置为新起一个会话,每个会话独享HiveContext,这样可以获得独立的UDF/UDAF,临时表,会话状态等。默认是新起会话。

新建会话的时候,会将会话和hiveContext的对应关系添加到OperationManager的sessionToContexts这个Map。

只看thrift server的代码还不够,因为它只是“替父出征”,真正在幕后做调度的其实还是被extend的hive的代码,二者要结合来看。以一次SQL执行来看:

CLIService根据句柄找到Session并执行statement

public class CLIService extends CompositeService implements ICLIService {
  @Override
  public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
      Map<String, String> confOverlay)
          throws HiveSQLException {
    OperationHandle opHandle = sessionManager.getSession(sessionHandle)
        .executeStatement(statement, confOverlay);
    LOG.debug(sessionHandle + ": executeStatement()");
    return opHandle;
  }

每次statement执行,都是新申请的operation,都会加到OperationManager去管理。newExecuteStatementOperation被SparkSQLOperationManager.newExecuteStatementOperation覆盖了,创建的operation实际是SparkExecuteStatementOperation。

public class HiveSessionImpl implements HiveSession {
  @Override
  public OperationHandle executeStatement(String statement, Map<String, String> confOverlay)
      throws HiveSQLException {
    return executeStatementInternal(statement, confOverlay, false);
  }

 private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
      boolean runAsync)
          throws HiveSQLException {
    acquire(true);

    OperationManager operationManager = getOperationManager();
    ExecuteStatementOperation operation = operationManager
        .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync);
    OperationHandle opHandle = operation.getHandle();
    try {
      operation.run();
      opHandleSet.add(opHandle);
      return opHandle;
    } catch (HiveSQLException e) {
..
  }

operation.run的执行过程:

public abstract class Operation {
  public void run() throws HiveSQLException {
    beforeRun();
    try {
      runInternal();
    } finally {
      afterRun();
    }
  }

前面把cliservice、session、operation、statement理清楚了,下面来看看具体的statement执行者SparkExecuteStatementOperation。

如前所述,一个Operation分三步:beforeRun、runInternal、afterRun,两头现在只是用来记录日志,直接看runInternal。默认为异步,即后台执行,SparkExecuteStatementOperation创建一个Runable对象,并将其提交到里面backgroundOperationPool,新起一个线程来做excute。

excute中最主要的是hiveContext.sql(statement),其他一些管理事务,如通知listener记录statement的信息,设置operation的状态。

综上,spark thriftserver在hiveContext上套了一层友好的SQL命令行的壳,用户只要通过beeline或JDBC执行SQL即可,不需要看到spark SQL内部的细节(例如hiveContext、sqlContext)。盗用下别人的图,关系整理的很好:

spark-hive-thriftserver里还有个SparkSQLCLIDriver,是在spark-sql里用的,现在还用不到。