SpringCloud源码分析 (Eureka-Server-处理客户端删除状态请求) (六)
文章目录
- 1.处理客户端删除状态请求
- 1.1 InstanceResource.deleteStatusUpdate()
- 1.2 PeerAwareInstanceRegistryImpl.deleteStatusOverride()
- 1.3 AbstractInstanceRegistry.deleteStatusOverride()
- 1.4 PeerAwareInstanceRegistryImpl.replicateToPeers()
1.处理客户端删除状态请求
一但客户端发起了删除状态请求,服务端就不在接受来自该客户端的心跳请求。
1.1 InstanceResource.deleteStatusUpdate()
@DELETE
@Path("status")
public Response deleteStatusUpdate(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("value") String newStatusValue,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
//isReplication:是否是服务端之间的复制同步请求
//newStatusValue:我们知道客户端发起删除状态请求的时候,不会传状态,所以这里为null
//lastDirtyTimestamp:客户端的最新修改时间戳
try {
// 根据微服务名称和instanceId获取注册表中的实例信息
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// 为了保证服务健壮性,其实客户端发过来时newStatusValue是null
InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue);
// 删除覆盖状态
boolean isSuccess = registry.deleteStatusOverride(app.getName(), id,
newStatus, lastDirtyTimestamp, "true".equals(isReplication));
if (isSuccess) {
logger.info("Status override removed: {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.warn("Unable to remove status override: {} - {}", app.getName(), id);
return Response.serverError().build();
}
} catch (Throwable e) {
logger.error("Error removing instance's {} status override", id);
return Response.serverError().build();
}
}
1.2 PeerAwareInstanceRegistryImpl.deleteStatusOverride()
删除覆盖状态方法
@Override
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
// 调用父类先本地删除覆盖状态
if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
// 删除成功后,将删除覆盖状态的操作同步给集群中的其他节点
replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
return true;
}
return false;
}
1.3 AbstractInstanceRegistry.deleteStatusOverride()
本地删除状态
@Override
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
try {
// 读锁
read.lock();
STATUS_OVERRIDE_DELETE.increment(isReplication);
// 获取注册表中的实例信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
lease = gMap.get(id);
}
if (lease == null) {
return false;
} else {
// 刷新续约时间
lease.renew();
// 获取实例信息
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
// 根据instanceId删除对应的覆盖状态
InstanceStatus currentOverride = overriddenInstanceStatusMap.remove(id);
if (currentOverride != null && info != null) {
// 新instanceInfo的覆盖状态为UNKNOWN
info.setOverriddenStatus(InstanceStatus.UNKNOWN);
// 更新instanceInfo的状态,但不记录dirty时间戳
info.setStatusWithoutDirty(newStatus);
long replicaDirtyTimestamp = 0;
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
// 设置行为类型为修改
info.setActionType(ActionType.MODIFIED);
// 放入最近更新队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 更新服务端的最新修改时间
info.setLastUpdatedTimestamp();
// 缓存失效
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}
1.4 PeerAwareInstanceRegistryImpl.replicateToPeers()
删除状态操作同步给集群中其他节点
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
* 将所有eureka操作复制到对等eureka节点,但复制到该节点的流量除外。
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {//是否是Server之间的复制请求,此时还是客户端发起的请求,所以是false
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 如果已经是复制,则不要再次复制,因为这将创建有毒复制
// peerEurekaNodes代表的就是当前Eureka Server的集群
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
//遍历集群中所有的节点
// If the url represents this host, do not replicate to yourself.
// 如果URL代表此主机,请不要复制到您自己。
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
//将对eureka的操作行为复制给其他eureka节点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
replicateInstanceActionsToPeers()
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
AbstractJerseyEurekaHttpClient.cancal()
AbstractJerseyEurekaHttpClient.sendHeartBeat()
AbstractJerseyEurekaHttpClient.register()
AbstractJerseyEurekaHttpClient.statusUpdate()
AbstractJerseyEurekaHttpClient.deleteStatusOverride()