Test Executor的代码在src/examples/test_executor.cpp中
-
int main(int argc, char** argv)
-
{
-
TestExecutor executor;
-
MesosExecutorDriver driver(&executor);
-
return driver.run() == DRIVER_STOPPED ? 0 : 1;
-
}
|
Executor的运行主要依赖于MesosExecutorDriver作为封装,和mesos-slave进行通信。
MesosExecutorDriver的实现在src/exec/exec.cpp中
-
Status MesosExecutorDriver::run()
-
{
-
Status status = start();
-
return status != DRIVER_RUNNING ? status : join();
-
}
|
-
Status MesosExecutorDriver::start()
-
{
-
synchronized (mutex) {
-
if (status != DRIVER_NOT_STARTED) {
-
return status;
-
}
-
-
// Set stream buffering mode to flush on newlines so that we
-
// capture logs from user processes even when output is redirected
-
// to a file.
-
setvbuf(stdout, 0, _IOLBF, 0);
-
setvbuf(stderr, 0, _IOLBF, 0);
-
-
bool local;
-
-
UPID slave;
-
SlaveID slaveId;
-
FrameworkID frameworkId;
-
ExecutorID executorId;
-
string workDirectory;
-
bool checkpoint;
-
-
Option<string> value;
-
std::istringstream iss;
-
-
// Check if this is local (for example, for testing).
-
local = os::getenv("MESOS_LOCAL").isSome();
-
-
// Get slave PID from environment.
-
value = os::getenv("MESOS_SLAVE_PID");
-
if (value.isNone()) {
-
EXIT(EXIT_FAILURE)
-
<< "Expecting 'MESOS_SLAVE_PID' to be set in the environment";
-
}
-
-
slave = UPID(value.get());
-
CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
-
-
// Get slave ID from environment.
-
value = os::getenv("MESOS_SLAVE_ID");
-
if (value.isNone()) {
-
EXIT(EXIT_FAILURE)
-
<< "Expecting 'MESOS_SLAVE_ID' to be set in the environment";
-
}
-
slaveId.set_value(value.get());
-
-
// Get framework ID from environment.
-
value = os::getenv("MESOS_FRAMEWORK_ID");
-
if (value.isNone()) {
-
EXIT(EXIT_FAILURE)
-
<< "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
-
}
-
frameworkId.set_value(value.get());
-
-
// Get executor ID from environment.
-
value = os::getenv("MESOS_EXECUTOR_ID");
-
if (value.isNone()) {
-
EXIT(EXIT_FAILURE)
-
<< "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
-
}
-
executorId.set_value(value.get());
-
-
// Get working directory from environment.
-
value = os::getenv("MESOS_DIRECTORY");
-
if (value.isNone()) {
-
EXIT(EXIT_FAILURE)
-
<< "Expecting 'MESOS_DIRECTORY' to be set in the environment";
-
}
-
workDirectory = value.get();
-
-
// Get checkpointing status from environment.
-
value = os::getenv("MESOS_CHECKPOINT");
-
checkpoint = value.isSome() && value.get() == "1";
-
-
Duration recoveryTimeout = RECOVERY_TIMEOUT;
-
-
// Get the recovery timeout if checkpointing is enabled.
-
if (checkpoint) {
-
value = os::getenv("MESOS_RECOVERY_TIMEOUT");
-
-
if (value.isSome()) {
-
Try<Duration> _recoveryTimeout = Duration::parse(value.get());
-
-
if (_recoveryTimeout.isError()) {
-
EXIT(EXIT_FAILURE)
-
<< "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
-
<< _recoveryTimeout.error();
-
}
-
-
recoveryTimeout = _recoveryTimeout.get();
-
}
-
}
-
-
CHECK(process == NULL);
-
-
process = new ExecutorProcess(
-
slave,
-
this,
-
executor,
-
slaveId,
-
frameworkId,
-
executorId,
-
local,
-
workDirectory,
-
checkpoint,
-
recoveryTimeout,
-
&mutex,
-
latch);
-
-
spawn(process);
-
-
return status = DRIVER_RUNNING;
-
}
-
}
|
启动另一个线程ExecutorProcess,它的构造函数如下,注册了很多消息处理函数。
-
ExecutorProcess(const UPID& _slave,
-
MesosExecutorDriver* _driver,
-
Executor* _executor,
-
const SlaveID& _slaveId,
-
const FrameworkID& _frameworkId,
-
const ExecutorID& _executorId,
-
bool _local,
-
const string& _directory,
-
bool _checkpoint,
-
Duration _recoveryTimeout,
-
std::recursive_mutex* _mutex,
-
Latch* _latch)
-
: ProcessBase(ID::generate("executor")),
-
slave(_slave),
-
driver(_driver),
-
executor(_executor),
-
slaveId(_slaveId),
-
frameworkId(_frameworkId),
-
executorId(_executorId),
-
connected(false),
-
connection(UUID::random()),
-
local(_local),
-
aborted(false),
-
mutex(_mutex),
-
latch(_latch),
-
directory(_directory),
-
checkpoint(_checkpoint),
-
recoveryTimeout(_recoveryTimeout)
-
{
-
LOG(INFO) << "Version: " << MESOS_VERSION;
-
-
install<ExecutorRegisteredMessage>(
-
&ExecutorProcess::registered,
-
&ExecutorRegisteredMessage::executor_info,
-
&ExecutorRegisteredMessage::framework_id,
-
&ExecutorRegisteredMessage::framework_info,
-
&ExecutorRegisteredMessage::slave_id,
-
&ExecutorRegisteredMessage::slave_info);
-
-
install<ExecutorReregisteredMessage>(
-
&ExecutorProcess::reregistered,
-
&ExecutorReregisteredMessage::slave_id,
-
&ExecutorReregisteredMessage::slave_info);
-
-
install<ReconnectExecutorMessage>(
-
&ExecutorProcess::reconnect,
-
&ReconnectExecutorMessage::slave_id);
-
-
install<RunTaskMessage>(
-
&ExecutorProcess::runTask,
-
&RunTaskMessage::task);
-
-
install<KillTaskMessage>(
-
&ExecutorProcess::killTask,
-
&KillTaskMessage::task_id);
-
-
install<StatusUpdateAcknowledgementMessage>(
-
&ExecutorProcess::statusUpdateAcknowledgement,
-
&StatusUpdateAcknowledgementMessage::slave_id,
-
&StatusUpdateAcknowledgementMessage::framework_id,
-
&StatusUpdateAcknowledgementMessage::task_id,
-
&StatusUpdateAcknowledgementMessage::uuid);
-
-
install<FrameworkToExecutorMessage>(
-
&ExecutorProcess::frameworkMessage,
-
&FrameworkToExecutorMessage::slave_id,
-
&FrameworkToExecutorMessage::framework_id,
-
&FrameworkToExecutorMessage::executor_id,
-
&FrameworkToExecutorMessage::data);
-
-
install<ShutdownExecutorMessage>(
-
&ExecutorProcess::shutdown);
-
}
|
在ExecutorProcess的initiailize的函数中,向mesos-slave发送消息进行注册。
-
virtual void initialize()
-
{
-
VLOG(1) << "Executor started at: " << self()
-
<< " with pid " << getpid();
-
-
link(slave);
-
-
// Register with slave.
-
RegisterExecutorMessage message;
-
message.mutable_framework_id()->MergeFrom(frameworkId);
-
message.mutable_executor_id()->MergeFrom(executorId);
-
send(slave, message);
-
}
|
当Mesos-slave向TestExecutor发送RunTaskMessage消息的时候,ExecutorProcess调用runTask函数。
-
void runTask(const TaskInfo& task)
-
{
-
if (aborted.load()) {
-
VLOG(1) << "Ignoring run task message for task " << task.task_id()
-
<< " because the driver is aborted!";
-
return;
-
}
-
-
CHECK(!tasks.contains(task.task_id()))
-
<< "Unexpected duplicate task " << task.task_id();
-
-
tasks[task.task_id()] = task;
-
-
VLOG(1) << "Executor asked to run task '" << task.task_id() << "'";
-
-
Stopwatch stopwatch;
-
if (FLAGS_v >= 1) {
-
stopwatch.start();
-
}
-
-
executor->launchTask(driver, task);
-
-
VLOG(1) << "Executor::launchTask took " << stopwatch.elapsed();
-
}
|
最终调用executor的launchTask
在src/examples/test_executor.cpp中
-
virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
-
{
-
cout << "Starting task " << task.task_id().value() << endl;
-
-
TaskStatus status;
-
status.mutable_task_id()->MergeFrom(task.task_id());
-
status.set_state(TASK_RUNNING);
-
-
driver->sendStatusUpdate(status);
-
-
// This is where one would perform the requested task.
-
-
cout << "Finishing task " << task.task_id().value() << endl;
-
-
status.mutable_task_id()->MergeFrom(task.task_id());
-
status.set_state(TASK_FINISHED);
-
-
driver->sendStatusUpdate(status);
-
}
|