package krpc import ( "fmt" "io" "reflect" "sync" "time" "krwu.top/krpc.v1/codec" ) type request struct { h *codec.Header argv, replyv reflect.Value mtype *methodType svc *service } func (s *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) { var h codec.Header if err := cc.ReadHeader(&h); err != nil { if err != io.EOF && err != io.ErrUnexpectedEOF { fmt.Println("rpc server: read header error:", err) } return nil, err } return &h, nil } func (s *Server) readRequest(cc codec.Codec) (*request, error) { h, err := s.readRequestHeader(cc) if err != nil && err != io.EOF { fmt.Println("rpc server: read request error: ", err) return nil, err } req := &request{h: h} req.svc, req.mtype, err = s.findService(h.ServiceMethod) if err != nil { return req, err } req.argv = req.mtype.newArgv() req.replyv = req.mtype.newReplyv() argvi := req.argv.Interface() if req.argv.Type().Kind() != reflect.Ptr { argvi = req.argv.Addr().Interface() } if err = cc.ReadBody(argvi); err != nil { fmt.Println("rpc: server read body error: ", err) return req, err } return req, nil } func (s *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) { defer wg.Done() called := make(chan struct{}, 1) sent := make(chan struct{}, 1) go func() { err := req.svc.call(req.mtype, req.argv, req.replyv) called <- struct{}{} if err != nil { req.h.Error = err.Error() s.sendResponse(cc, req.h, invalidRequest, sending) sent <- struct{}{} return } s.sendResponse(cc, req.h, req.replyv.Interface(), sending) sent <- struct{}{} }() if timeout == 0 { <-called <-sent return } select { case <-time.After(timeout): req.h.Error = fmt.Sprintf("prc: server request handle timeout with %s", timeout) s.sendResponse(cc, req.h, invalidRequest, sending) case <-called: <-sent } }