Golang实现支持中间件的简易TCP框架

在golang的标准库中没有为tcp直接提供像http那样简单易用的服务框架,我们不妨自己手动实现一个

主体思路

我们的实现的主题思路分为以下四个内容

  1. 监听服务
  2. 获取构建新连接对象并设置超时时间及keepalive
  3. 设置方法退出时连接关闭
  4. 调用回调接口 TcpHandler

主要结构体和接口

首先是TCPServer的结构体,我们希望用户可以自由构建TcpServer并设置超时时间等自定义选项

type TcpServer struct {
   Addr    string
   Handler TCPHandler  <- 对外提供的服务方法接口
   err     error
   BaseCtx context.Context

   WriteTimeout     time.Duration
   ReadTimeout      time.Duration
   KeepAliveTimeout time.Duration

   mu         sync.Mutex
   inShutdown int32
   doneChan   chan struct{}
   l          *onceCloseListener
}

像httpHandler一样,对外提供抽象的ServeTCP方法

type TCPHandler interface {
   ServeTCP(ctx context.Context, conn net.Conn)
}

服务启动方法

用户可以通过自行构建TcpServer实例再通过ListenAndServe()调用服务,或通过tcp.ListenAndServe(":8080", handler) 使用默认的TcpServer实例快速启动服务。

ListenAndServe() 方法中,进行参数的校验和初始化操作

Serve(l net.Listener) 方法中,通过 l.Accept() 接收信息,包装接收到的conn并另起一个协程处理服务

func ListenAndServe(addr string, handler TCPHandler) error {
    server := &TcpServer{Addr: addr, Handler: handler, doneChan: make(chan struct{})}
    return server.ListenAndServe()
}

func (s *TcpServer) ListenAndServe() error {
   if s.shuttingDown() {
      return ErrServerClosed
   }
   if s.doneChan == nil {
      s.doneChan = make(chan struct{})
   }
   addr := s.Addr
   if addr == "" {
      return errors.New("need addr")
   }
   ln, err := net.Listen("tcp", addr)
   if err != nil {
      return err
   }
   return s.Serve(tcpKeepAliveListener{
      ln.(*net.TCPListener)})
}

func (s *TcpServer) Serve(l net.Listener) error {
    s.l = &onceCloseListener{Listener: l}
    defer s.l.Close() //执行listener关闭
    if s.BaseCtx == nil {
        s.BaseCtx = context.Background()
    }
    baseCtx := s.BaseCtx
    ctx := context.WithValue(baseCtx, ServerContextKey, s) <- 将TcpServer实例存入context中
    for {
        rw, e := l.Accept()
        if e != nil {
            select {
            case <-s.getDoneChan():
                return ErrServerClosed
            default:
            }
            fmt.Printf("accept fail, err: %v\n", e)
            continue
        }
        c := s.newConn(rw)
        go c.serve(ctx)
    }
    return nil
}

包装 net.Conntcp.conn

type conn struct {
    server     *TcpServer   // 反引用TcpServer
    remoteAddr string       // 发送端地址
    rwc        net.Conn
}

func (s *TcpServer) newConn(rwc net.Conn) *conn {
   c := &conn{
      server: s,
      rwc:    rwc,
   }
   // 设置参数
   if d := c.server.ReadTimeout; d != 0 {
      c.rwc.SetReadDeadline(time.Now().Add(d))
   }
   if d := c.server.WriteTimeout; d != 0 {
      c.rwc.SetWriteDeadline(time.Now().Add(d))
   }
   if d := c.server.KeepAliveTimeout; d != 0 {
      if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
         tcpConn.SetKeepAlive(true)
         tcpConn.SetKeepAlivePeriod(d)
      }
   }
   return c
}

tcp.conn.Server(ctx) 调用回调函数进行服务处理

func (c *conn) serve(ctx context.Context) {
   defer func() {
      if err := recover(); err != nil && err != ErrAbortHandler {
         const size = 64 << 10
         buf := make([]byte, size)
         buf = buf[:runtime.Stack(buf, false)]
         fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
      }
      c.close()
   }()
   c.remoteAddr = c.rwc.RemoteAddr().String()
   ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
   if c.server.Handler == nil {
      panic("handler empty")
   }
   c.server.Handler.ServeTCP(ctx, c.rwc)
}

这样,一个简单易用的TCP服务框架就搭建完成了,其中一些close() 等方法在此处没有展示出来,更多详细代码可在我的代码仓库中查看:https://github.com/Kirov7/fayUtils/net/tcp

扩展中间件的实现

扩展中间件功能的实现思路

  • 方法构建
    • 构建中间件URL路由
    • 构建URL的中间件方法数组
    • 使用Use方法整合路由与方法数组
  • 方法调用
    • 构建方法请求逻辑
    • 封装TCPHandler接口与TcpServer整合

TcpSliceRouter.Group(path) 方法,初始化路由分组(默认只能全局)

// 创建 Group
func (g *TcpSliceRouter) Group(path string) *TcpSliceGroup {
   if path != "/" {
      panic("only accept path=/")
   }
   return &TcpSliceGroup{
      TcpSliceRouter: g,
      path:           path,
   }
}

TcpSliceGroup.Use(middlewares ...TcpHandlerFunc) 构造回调方法

调用 Use 方法传入中间件集合,添加到切片 c.handlers

// 构造回调方法
func (g *TcpSliceGroup) Use(middlewares ...TcpHandlerFunc) *TcpSliceGroup {
   g.handlers = append(g.handlers, middlewares...)
   existsFlag := false
   for _, oldGroup := range g.TcpSliceRouter.groups {
      if oldGroup == g {
         existsFlag = true
      }
   }
   if !existsFlag {
      g.TcpSliceRouter.groups = append(g.TcpSliceRouter.groups, g)
   }
   return g
}

通过 NewTcpSliceRouterHandler 方法传入最后调用的逻辑方法coreFunc并传入已经 Use 了中间件的, TcpSliceRouter

func NewTcpSliceRouterHandler(coreFunc func(*TcpSliceRouterContext) tcp_server.TCPHandler, router *TcpSliceRouter) *TcpSliceRouterHandler {
   return &TcpSliceRouterHandler{
      coreFunc: coreFunc,
      router:   router,
   }
}

最终的回调函数 ServeTCP((ctx context.Context, conn net.Conn),初始化 context 之后将 coreFunc 追加到 c.handlers,重置执行光标,从第一个 c.handlers 开始执行中间件

func (w *TcpSliceRouterHandler) ServeTCP(ctx context.Context, conn net.Conn) {
   c := newTcpSliceRouterContext(conn, w.router, ctx)
   c.handlers = append(c.handlers, func(c *TcpSliceRouterContext) {
      w.coreFunc(c).ServeTCP(ctx, conn)
   })
   c.Reset()
   c.Next()
}

在中间件中自行调用Next()Abort()等中间件逻辑,最后所有中间件执行完毕之后执行 coreFunc(已经被追加到c.handlers的最后位置)

// 从最先加入中间件开始回调
func (c *TcpSliceRouterContext) Next() {
   c.index++
   for c.index < int8(len(c.handlers)) {
      c.handlers[c.index](c)
      c.index++
   }
}

// 跳出中间件方法
func (c *TcpSliceRouterContext) Abort() {
    c.index = abortIndex
}
暂无评论

发送评论 编辑评论


|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇