侵权投诉
搜索
更多>> 热门搜索:
订阅
纠错
加入自媒体

技术文章:分布式系统模式9-Single Socket Channel

2020-12-28 10:30
java达人
关注

作者: Unmesh Joshi

译者: java达人

 通过使用一个TCP连接来维护发送到服务器的请求的顺序。

问题

当我们使用领导者和追随者模式时,我们需要确保领导者和每个追随者之间的信息保持有序,并对丢失的信息进行重试。与此同时保持较低的新连接成本,这样打开新连接不会增加系统的延迟。

解决方案

幸运的是,长期广泛使用的TCP机制提供了所有这些必要的特征。因此,我们可以通过保证一个follower和leader之间的所有通信都通过一个套接字通道来实现我们需要的通信。然后追随者使用一个Singular Update Queue序列化来自leader的更新

节点在连接打开后永远不会关闭它,并持续读取新请求。节点对每个连接使用一个专用线程来读写请求。如果使用了非阻塞io,则不需要每个连接一个线程。一个简单的基于线程的实现如下:

class SocketHandlerThread…
 @Override  public void run() {      try {          //Continues to read/write to the socket connection till it is closed.          while (true) {              handleRequest();          }      } catch (Exception e) {          getLogger().debug(e);      }  }
  private void handleRequest() {      RequestOrResponse request = readRequestFrom(clientSocket);      RequestId requestId = RequestId.valueOf(request.getRequestId());      requestConsumer.accept(new Message<>(request, requestId, clientSocket));    }

节点读取请求并将它们提交给一个单一的更新队列进行处理。一旦节点处理了请求,它就将响应写回套接字。

每当节点建立通信时,它就会打开一个套接字连接,用于与另一方的所有请求。

class SingleSocketChannel…
 public class SingleSocketChannel implements Closeable {      final InetAddressAndPort address;      final int heartbeatIntervalMs;      private Socket clientSocket;      private final OutputStream socketOutputStream;      private final InputStream inputStream;
     public SingleSocketChannel(InetAddressAndPort address, int heartbeatIntervalMs) throws IOException {          this.address = address;          this.heartbeatIntervalMs = heartbeatIntervalMs;          clientSocket = new Socket();          clientSocket.connect(new InetSocketAddress(address.getAddress(), address.getPort()), heartbeatIntervalMs);          clientSocket.setSoTimeout(heartbeatIntervalMs * 10); //set socket read timeout to be more than heartbeat.          socketOutputStream = clientSocket.getOutputStream();          inputStream = clientSocket.getInputStream();      }
     public synchronized RequestOrResponse blockingSend(RequestOrResponse request) throws IOException {          writeRequest(request);          byte[] responseBytes = readResponse();          return deserialize(responseBytes);      }
     private void writeRequest(RequestOrResponse request) throws IOException {          var dataStream = new DataOutputStream(socketOutputStream);          byte[] messageBytes = serialize(request);          dataStream.writeInt(messageBytes.length);          dataStream.write(messageBytes);      }

在连接上保持一个超时是很重要的,这样在出现错误时它就不会无限期地阻塞。我们使用HeartBeat机制,定期通过套接字通道发送请求,以使其保持活动状态。这个超时时间通常为心跳间隔的倍数,包含网络往返时间和一些可能的网络延迟。将连接超时设置为心跳间隔的10倍是合理的。

class SocketListener…
 private void setReadTimeout(Socket clientSocket) throws SocketException {      clientSocket.setSoTimeout(config.getHeartBeatIntervalMs() * 10);  }

通过单个通道发送请求可能会产生头部阻塞问题。为了避免这些问题,我们可以使用Request Pipeline。

例子

?Zookeeper使用一个套接字通道和每个追随者一个线程来完成所有的通信。?Kafka在follower和leader分区之间使用单个套接字通道来复制消息。?参考Raft共识算法的实现,LogCabin使用单套接字通道在领导者和追随者之间进行通信

声明: 本文由入驻维科号的作者撰写,观点仅代表作者本人,不代表OFweek立场。如有侵权或其他问题,请联系举报。

发表评论

0条评论,0人参与

请输入评论内容...

请输入评论/评论长度6~500个字

您提交的评论过于频繁,请输入验证码继续

暂无评论

暂无评论

    通信 猎头职位 更多
    文章纠错
    x
    *文字标题:
    *纠错内容:
    联系邮箱:
    *验 证 码:

    粤公网安备 44030502002758号