Update vendoring

This commit is contained in:
Jeff Mitchell
2016-04-26 00:18:04 +00:00
parent c26838e6da
commit 97810148f3
253 changed files with 14960 additions and 4479 deletions

View File

@@ -21,13 +21,26 @@ const (
MessageRequest = 1
MessageResponse = 2
MessageFinish = 3
MessageTimeout = 4
)
type PacketResponse struct {
Packet *ber.Packet
Error error
}
func (pr *PacketResponse) ReadPacket() (*ber.Packet, error) {
if (pr == nil) || (pr.Packet == nil && pr.Error == nil) {
return nil, NewError(ErrorNetwork, errors.New("ldap: could not retrieve response"))
}
return pr.Packet, pr.Error
}
type messagePacket struct {
Op int
MessageID int64
Packet *ber.Packet
Channel chan *ber.Packet
Channel chan *PacketResponse
}
type sendMessageFlags uint
@@ -44,7 +57,7 @@ type Conn struct {
isStartingTLS bool
Debug debugging
chanConfirm chan bool
chanResults map[int64]chan *ber.Packet
chanResults map[int64]chan *PacketResponse
chanMessage chan *messagePacket
chanMessageID chan int64
wgSender sync.WaitGroup
@@ -52,6 +65,7 @@ type Conn struct {
once sync.Once
outstandingRequests uint
messageMutex sync.Mutex
requestTimeout time.Duration
}
var _ Client = &Conn{}
@@ -97,12 +111,13 @@ func DialTLS(network, addr string, config *tls.Config) (*Conn, error) {
// NewConn returns a new Conn using conn for network I/O.
func NewConn(conn net.Conn, isTLS bool) *Conn {
return &Conn{
conn: conn,
chanConfirm: make(chan bool),
chanMessageID: make(chan int64),
chanMessage: make(chan *messagePacket, 10),
chanResults: map[int64]chan *ber.Packet{},
isTLS: isTLS,
conn: conn,
chanConfirm: make(chan bool),
chanMessageID: make(chan int64),
chanMessage: make(chan *messagePacket, 10),
chanResults: map[int64]chan *PacketResponse{},
requestTimeout: 0,
isTLS: isTLS,
}
}
@@ -133,6 +148,13 @@ func (l *Conn) Close() {
l.wgClose.Wait()
}
// Sets the time after a request is sent that a MessageTimeout triggers
func (l *Conn) SetTimeout(timeout time.Duration) {
if timeout > 0 {
l.requestTimeout = timeout
}
}
// Returns the next available messageID
func (l *Conn) nextMessageID() int64 {
if l.chanMessageID != nil {
@@ -167,9 +189,16 @@ func (l *Conn) StartTLS(config *tls.Config) error {
}
l.Debug.Printf("%d: waiting for response", messageID)
packet = <-channel
defer l.finishMessage(messageID)
packetResponse, ok := <-channel
if !ok {
return NewError(ErrorNetwork, errors.New("ldap: channel closed"))
}
packet, err = packetResponse.ReadPacket()
l.Debug.Printf("%d: got response %p", messageID, packet)
l.finishMessage(messageID)
if err != nil {
return err
}
if l.Debug {
if err := addLDAPDescriptions(packet); err != nil {
@@ -197,11 +226,11 @@ func (l *Conn) StartTLS(config *tls.Config) error {
return nil
}
func (l *Conn) sendMessage(packet *ber.Packet) (chan *ber.Packet, error) {
func (l *Conn) sendMessage(packet *ber.Packet) (chan *PacketResponse, error) {
return l.sendMessageWithFlags(packet, 0)
}
func (l *Conn) sendMessageWithFlags(packet *ber.Packet, flags sendMessageFlags) (chan *ber.Packet, error) {
func (l *Conn) sendMessageWithFlags(packet *ber.Packet, flags sendMessageFlags) (chan *PacketResponse, error) {
if l.isClosing {
return nil, NewError(ErrorNetwork, errors.New("ldap: connection closed"))
}
@@ -223,7 +252,7 @@ func (l *Conn) sendMessageWithFlags(packet *ber.Packet, flags sendMessageFlags)
l.messageMutex.Unlock()
out := make(chan *ber.Packet)
out := make(chan *PacketResponse)
message := &messagePacket{
Op: MessageRequest,
MessageID: packet.Children[0].Value.(int64),
@@ -283,40 +312,66 @@ func (l *Conn) processMessages() {
select {
case l.chanMessageID <- messageID:
messageID++
case messagePacket, ok := <-l.chanMessage:
case message, ok := <-l.chanMessage:
if !ok {
l.Debug.Printf("Shutting down - message channel is closed")
return
}
switch messagePacket.Op {
switch message.Op {
case MessageQuit:
l.Debug.Printf("Shutting down - quit message received")
return
case MessageRequest:
// Add to message list and write to network
l.Debug.Printf("Sending message %d", messagePacket.MessageID)
l.chanResults[messagePacket.MessageID] = messagePacket.Channel
// go routine
buf := messagePacket.Packet.Bytes()
l.Debug.Printf("Sending message %d", message.MessageID)
l.chanResults[message.MessageID] = message.Channel
buf := message.Packet.Bytes()
_, err := l.conn.Write(buf)
if err != nil {
l.Debug.Printf("Error Sending Message: %s", err.Error())
break
}
// Add timeout if defined
if l.requestTimeout > 0 {
go func() {
defer func() {
if err := recover(); err != nil {
log.Printf("ldap: recovered panic in RequestTimeout: %v", err)
}
}()
time.Sleep(l.requestTimeout)
timeoutMessage := &messagePacket{
Op: MessageTimeout,
MessageID: message.MessageID,
}
l.sendProcessMessage(timeoutMessage)
}()
}
case MessageResponse:
l.Debug.Printf("Receiving message %d", messagePacket.MessageID)
if chanResult, ok := l.chanResults[messagePacket.MessageID]; ok {
chanResult <- messagePacket.Packet
l.Debug.Printf("Receiving message %d", message.MessageID)
if chanResult, ok := l.chanResults[message.MessageID]; ok {
chanResult <- &PacketResponse{message.Packet, nil}
} else {
log.Printf("Received unexpected message %d", messagePacket.MessageID)
ber.PrintPacket(messagePacket.Packet)
log.Printf("Received unexpected message %d, %v", message.MessageID, l.isClosing)
ber.PrintPacket(message.Packet)
}
case MessageTimeout:
// Handle the timeout by closing the channel
// All reads will return immediately
if chanResult, ok := l.chanResults[message.MessageID]; ok {
chanResult <- &PacketResponse{message.Packet, errors.New("ldap: connection timed out")}
l.Debug.Printf("Receiving message timeout for %d", message.MessageID)
delete(l.chanResults, message.MessageID)
close(chanResult)
}
case MessageFinish:
// Remove from message list
l.Debug.Printf("Finished message %d", messagePacket.MessageID)
close(l.chanResults[messagePacket.MessageID])
delete(l.chanResults, messagePacket.MessageID)
l.Debug.Printf("Finished message %d", message.MessageID)
if chanResult, ok := l.chanResults[message.MessageID]; ok {
close(chanResult)
delete(l.chanResults, message.MessageID)
}
}
}
}