XRootD
Loading...
Searching...
No Matches
XrdClStream.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClStream.hh"
26#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClChannel.hh"
29#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
32#include "XrdCl/XrdClUtils.hh"
34#include "XrdCl/XrdClMonitor.hh"
39
40#include <sys/types.h>
41#include <algorithm>
42#include <sys/socket.h>
43#include <sys/time.h>
44
45namespace XrdCl
46{
47 //----------------------------------------------------------------------------
48 // Statics
49 //----------------------------------------------------------------------------
50 RAtomic_uint64_t Stream::sSessCntGen{0};
51
52 //----------------------------------------------------------------------------
53 // Incoming message helper
54 //----------------------------------------------------------------------------
56 {
57 InMessageHelper( Message *message = 0,
58 MsgHandler *hndlr = 0,
59 time_t expir = 0,
60 uint16_t actio = 0 ):
61 msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62 void Reset()
63 {
64 msg = 0; handler = 0; expires = 0; action = 0;
65 }
68 time_t expires;
69 uint16_t action;
70 };
71
72 //----------------------------------------------------------------------------
73 // Sub stream helper
74 //----------------------------------------------------------------------------
92
93 //----------------------------------------------------------------------------
94 // Constructor
95 //----------------------------------------------------------------------------
96 Stream::Stream( const URL *url, const URL &prefer ):
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pBytesSent( 0 ),
111 pBytesReceived( 0 )
112 {
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115
116 std::ostringstream o;
117 o << pUrl->GetHostId();
118 pStreamName = o.str();
119
120 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126
127 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129
130 pAddressType = Utils::String2AddressType( netStack );
131 if( pAddressType == Utils::AddressType::IPAuto )
132 {
134 if( !( stacks & XrdNetUtils::hasIP64 ) )
135 {
136 if( stacks & XrdNetUtils::hasIPv4 )
137 pAddressType = Utils::AddressType::IPv4;
138 else if( stacks & XrdNetUtils::hasIPv6 )
139 pAddressType = Utils::AddressType::IPv6;
140 }
141 }
142
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148 }
149
150 //----------------------------------------------------------------------------
151 // Destructor
152 //----------------------------------------------------------------------------
154 {
155 Disconnect( true );
156
157 Log *log = DefaultEnv::GetLog();
158 log->Debug( PostMasterMsg, "[%s] Destroying stream",
159 pStreamName.c_str() );
160
161 MonitorDisconnection( XRootDStatus() );
162
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165 delete *it;
166 }
167
168 //----------------------------------------------------------------------------
169 // Initializer
170 //----------------------------------------------------------------------------
172 {
173 if( !pTransport || !pPoller || !pChannelData )
175
176 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177 pChannelData, 0, this );
178 pSubStreams.push_back( new SubStreamData() );
179 pSubStreams[0]->socket = s;
180 return XRootDStatus();
181 }
182
183 //------------------------------------------------------------------------
184 // Make sure that the underlying socket handler gets write readiness
185 // events
186 //------------------------------------------------------------------------
188 {
189 XrdSysMutexHelper scopedLock( pMutex );
190
191 //--------------------------------------------------------------------------
192 // We are in the process of connecting the main stream, so we do nothing
193 // because when the main stream connection is established it will connect
194 // all the other streams
195 //--------------------------------------------------------------------------
196 if( pSubStreams[0]->status == Socket::Connecting )
197 return XRootDStatus();
198
199 //--------------------------------------------------------------------------
200 // The main stream is connected, so we can verify whether we have
201 // the up and the down stream connected and ready to handle data.
202 // If anything is not right we fall back to stream 0.
203 //--------------------------------------------------------------------------
204 if( pSubStreams[0]->status == Socket::Connected )
205 {
206 if( pSubStreams[path.down]->status != Socket::Connected )
207 path.down = 0;
208
209 if( pSubStreams[path.up]->status == Socket::Disconnected )
210 {
211 path.up = 0;
212 return pSubStreams[0]->socket->EnableUplink();
213 }
214
215 if( pSubStreams[path.up]->status == Socket::Connected )
216 return pSubStreams[path.up]->socket->EnableUplink();
217
218 return XRootDStatus();
219 }
220
221 //--------------------------------------------------------------------------
222 // The main stream is not connected, we need to check whether enough time
223 // has passed since we last encountered an error (if any) so that we could
224 // re-attempt the connection
225 //--------------------------------------------------------------------------
226 Log *log = DefaultEnv::GetLog();
227 time_t now = ::time(0);
228
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
231
232 gettimeofday( &pConnectionStarted, 0 );
233 ++pConnectionCount;
234
235 //--------------------------------------------------------------------------
236 // Resolve all the addresses of the host we're supposed to connect to
237 //--------------------------------------------------------------------------
238 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239 if( !st.IsOK() )
240 {
241 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
244 st.status = stFatal;
245 pLastFatalError = st;
246 return st;
247 }
248
249 if( pPrefer.IsValid() )
250 {
251 std::vector<XrdNetAddr> addrresses;
252 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253 if( !st.IsOK() )
254 {
255 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257 }
258 else
259 {
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
262 // first add all remaining addresses
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
265 {
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
268 }
269 // then copy all 'preferred' addresses
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271 // and keep the result
272 pAddresses.swap( tmp );
273 }
274 }
275
277 pAddresses );
278
279 while( !pAddresses.empty() )
280 {
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285 if( st.IsOK() )
286 {
287 pSubStreams[0]->status = Socket::Connecting;
288 break;
289 }
290 }
291 return st;
292 }
293
294 //----------------------------------------------------------------------------
295 // Queue the message for sending
296 //----------------------------------------------------------------------------
298 MsgHandler *handler,
299 bool stateful,
300 time_t expires )
301 {
302 XrdSysMutexHelper scopedLock( pMutex );
303 Log *log = DefaultEnv::GetLog();
304
305 //--------------------------------------------------------------------------
306 // Check the session ID and bounce if needed
307 //--------------------------------------------------------------------------
308 if( msg->GetSessionId() &&
309 (pSubStreams[0]->status != Socket::Connected ||
310 pSessionId != msg->GetSessionId()) )
312
313 //--------------------------------------------------------------------------
314 // Decide on the path to send the message
315 //--------------------------------------------------------------------------
316 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317 if( pSubStreams.size() <= path.up )
318 {
319 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320 "substream %d, using 0 instead", pStreamName.c_str(),
321 msg->GetObfuscatedDescription().c_str(), path.up );
322 path.up = 0;
323 }
324
325 log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326 "substream %d expecting answer at %d", pStreamName.c_str(),
327 msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
328
329 //--------------------------------------------------------------------------
330 // Enable *a* path and insert the message to the right queue
331 //--------------------------------------------------------------------------
332 XRootDStatus st = EnableLink( path );
333 if( st.IsOK() )
334 {
335 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337 expires, stateful );
338 }
339 else
340 st.status = stFatal;
341 return st;
342 }
343
344 //----------------------------------------------------------------------------
345 // Force connection
346 //----------------------------------------------------------------------------
348 {
349 XrdSysMutexHelper scopedLock( pMutex );
350 if( pSubStreams[0]->status == Socket::Connecting )
351 {
352 pSubStreams[0]->status = Socket::Disconnected;
353 XrdCl::PathID path( 0, 0 );
354 XrdCl::XRootDStatus st = EnableLink( path );
355 if( !st.IsOK() )
356 OnConnectError( 0, st );
357 }
358 }
359
360 //----------------------------------------------------------------------------
361 // Disconnect the stream
362 //----------------------------------------------------------------------------
363 void Stream::Disconnect( bool /*force*/ )
364 {
365 XrdSysMutexHelper scopedLock( pMutex );
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368 {
369 (*it)->socket->Close();
370 (*it)->status = Socket::Disconnected;
371 }
372 }
373
374 //----------------------------------------------------------------------------
375 // Handle a clock event
376 //----------------------------------------------------------------------------
377 void Stream::Tick( time_t now )
378 {
379 //--------------------------------------------------------------------------
380 // Check for timed-out requests and incoming handlers
381 //--------------------------------------------------------------------------
382 pMutex.Lock();
383 OutQueue q;
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386 q.GrabExpired( *(*it)->outQueue, now );
387 pMutex.UnLock();
388
390 pIncomingQueue->ReportTimeout( now );
391 }
392}
393
394//------------------------------------------------------------------------------
395// Handle message timeouts and reconnection in the future
396//------------------------------------------------------------------------------
397namespace
398{
399 class StreamConnectorTask: public XrdCl::Task
400 {
401 public:
402 //------------------------------------------------------------------------
403 // Constructor
404 //------------------------------------------------------------------------
405 StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
406 url( url )
407 {
408 std::string name = "StreamConnectorTask for ";
409 name += n;
410 SetName( name );
411 }
412
413 //------------------------------------------------------------------------
414 // Run the task
415 //------------------------------------------------------------------------
416 time_t Run( time_t )
417 {
419 return 0;
420 }
421
422 private:
423 XrdCl::URL url;
424 };
425}
426
427namespace XrdCl
428{
429 XRootDStatus Stream::RequestClose( Message &response )
430 {
431 ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
432 if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
433 Message *msg;
435 MessageUtils::CreateRequest( msg, req );
436 req->requestid = kXR_close;
437 memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
439 msg->SetSessionId( pSessionId );
440 NullResponseHandler *handler = new NullResponseHandler();
441 MessageSendParams params;
442 params.timeout = 0;
443 params.followRedirects = false;
444 params.stateful = true;
446 return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
447 }
448
449 //------------------------------------------------------------------------
450 // Check if message is a partial response
451 //------------------------------------------------------------------------
452 bool Stream::IsPartial( Message &msg )
453 {
454 ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
455 if( rsphdr->status == kXR_oksofar )
456 return true;
457
458 if( rsphdr->status == kXR_status )
459 {
460 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
462 return true;
463 }
464
465 return false;
466 }
467
468 //----------------------------------------------------------------------------
469 // Call back when a message has been reconstructed
470 //----------------------------------------------------------------------------
471 void Stream::OnIncoming( uint16_t subStream,
472 std::shared_ptr<Message> msg,
473 uint32_t bytesReceived )
474 {
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
477
478 MsgHandler *handler = nullptr;
479 uint16_t action = 0;
480 {
481 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482 handler = mh.handler;
483 action = mh.action;
484 mh.Reset();
485 }
486
487 if( !IsPartial( *msg ) )
488 {
489 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490 *pChannelData );
491 if( streamAction & TransportHandler::DigestMsg )
492 return;
493
494 if( streamAction & TransportHandler::RequestClose )
495 {
496 RequestClose( *msg );
497 return;
498 }
499 }
500
501 Log *log = DefaultEnv::GetLog();
502
503 //--------------------------------------------------------------------------
504 // No handler, we discard the message ...
505 //--------------------------------------------------------------------------
506 if( !handler )
507 {
508 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509 log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
512 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513 return;
514 }
515
516 //--------------------------------------------------------------------------
517 // We have a handler, so we call the callback
518 //--------------------------------------------------------------------------
519 log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520 pStreamName.c_str(), (void*)msg.get() );
521
523 {
524 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526
527 // if we are handling partial response we have to take down the timeout fence
528 if( IsPartial( *msg ) )
529 {
530 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531 if( xrdHandler ) xrdHandler->PartialReceived();
532 }
533
534 return;
535 }
536
537 Job *job = new HandleIncMsgJob( handler );
538 pJobManager->QueueJob( job );
539 }
540
541 //----------------------------------------------------------------------------
542 // Call when one of the sockets is ready to accept a new message
543 //----------------------------------------------------------------------------
544 std::pair<Message *, MsgHandler *>
545 Stream::OnReadyToWrite( uint16_t subStream )
546 {
547 XrdSysMutexHelper scopedLock( pMutex );
548 Log *log = DefaultEnv::GetLog();
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
550 {
551 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
553
554 pSubStreams[subStream]->socket->DisableUplink();
555 return std::make_pair( (Message *)0, (MsgHandler *)0 );
556 }
557
558 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560 h.expires,
561 h.stateful );
562
563 log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
564 "from out-queue to in-queue, starting to send outgoing.",
565 pUrl->GetHostId().c_str(), (void*)h.handler,
566 h.msg->GetObfuscatedDescription().c_str() );
567
568 scopedLock.UnLock();
569
570 if( h.handler )
571 {
572 bool rmMsg = false;
573 pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
574 if( rmMsg )
575 {
576 Log *log = DefaultEnv::GetLog();
577 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
578 pStreamName.c_str() );
579 }
580 h.handler->OnReadyToSend( h.msg );
581 }
582 return std::make_pair( h.msg, h.handler );
583 }
584
585 void Stream::DisableIfEmpty( uint16_t subStream )
586 {
587 XrdSysMutexHelper scopedLock( pMutex );
588 Log *log = DefaultEnv::GetLog();
589
590 if( pSubStreams[subStream]->outQueue->IsEmpty() )
591 {
592 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
593 pSubStreams[subStream]->socket->GetStreamName().c_str() );
594 pSubStreams[subStream]->socket->DisableUplink();
595 }
596 }
597
598 //----------------------------------------------------------------------------
599 // Call when a message is written to the socket
600 //----------------------------------------------------------------------------
601 void Stream::OnMessageSent( uint16_t subStream,
602 Message *msg,
603 uint32_t bytesSent )
604 {
605 pTransport->MessageSent( msg, subStream, bytesSent,
606 *pChannelData );
607 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
608 pBytesSent += bytesSent;
609 if( h.handler )
610 {
611 // ensure expiration time is assigned if still in queue
612 pIncomingQueue->AssignTimeout( h.handler );
613 // OnStatusReady may cause the handler to delete itself, in
614 // which case the handler or the user callback may also delete msg
616 }
617 pSubStreams[subStream]->outMsgHelper.Reset();
618 }
619
620 //----------------------------------------------------------------------------
621 // Call back when a message has been reconstructed
622 //----------------------------------------------------------------------------
623 void Stream::OnConnect( uint16_t subStream )
624 {
625 XrdSysMutexHelper scopedLock( pMutex );
626 pSubStreams[subStream]->status = Socket::Connected;
627
628 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
629 Log *log = DefaultEnv::GetLog();
630 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
631 subStream, ipstack.c_str() );
632
633 if( subStream == 0 )
634 {
635 pLastStreamError = 0;
636 pLastFatalError = XRootDStatus();
637 pConnectionCount = 0;
638 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639 pSessionId = ++sSessCntGen;
640
641 //------------------------------------------------------------------------
642 // Create the streams if they don't exist yet
643 //------------------------------------------------------------------------
644 if( pSubStreams.size() == 1 && numSub > 1 )
645 {
646 for( uint16_t i = 1; i < numSub; ++i )
647 {
648 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
649 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
650 pChannelData, i, this );
651 pSubStreams.push_back( new SubStreamData() );
652 pSubStreams[i]->socket = s;
653 }
654 }
655
656 //------------------------------------------------------------------------
657 // Connect the extra streams, if we fail we move all the outgoing items
658 // to stream 0, we don't need to enable the uplink here, because it
659 // should be already enabled after the handshaking process is completed.
660 //------------------------------------------------------------------------
661 if( pSubStreams.size() > 1 )
662 {
663 log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
664 pStreamName.c_str(), pSubStreams.size() - 1 );
665 for( size_t i = 1; i < pSubStreams.size(); ++i )
666 {
667 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
669 if( !st.IsOK() )
670 {
671 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672 pSubStreams[i]->socket->Close();
673 }
674 else
675 {
676 pSubStreams[i]->status = Socket::Connecting;
677 }
678 }
679 }
680
681 //------------------------------------------------------------------------
682 // Inform monitoring
683 //------------------------------------------------------------------------
684 pBytesSent = 0;
685 pBytesReceived = 0;
686 gettimeofday( &pConnectionDone, 0 );
688 if( mon )
689 {
691 i.server = pUrl->GetHostId();
692 i.sTOD = pConnectionStarted;
693 i.eTOD = pConnectionDone;
694 i.streams = pSubStreams.size();
695
696 AnyObject qryResult;
697 std::string *qryResponse = nullptr;
698 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
699 qryResult.Get( qryResponse );
700
701 if (qryResponse) {
702 i.auth = *qryResponse;
703 delete qryResponse;
704 } else {
705 i.auth = "";
706 }
707
708 mon->Event( Monitor::EvConnect, &i );
709 }
710
711 //------------------------------------------------------------------------
712 // For every connected control-stream call the global on-connect handler
713 //------------------------------------------------------------------------
715 }
716 else if( pOnDataConnJob )
717 {
718 //------------------------------------------------------------------------
719 // For every connected data-stream call the on-connect handler
720 //------------------------------------------------------------------------
721 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
722 }
723 }
724
725 //----------------------------------------------------------------------------
726 // On connect error
727 //----------------------------------------------------------------------------
728 void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
729 {
730 XrdSysMutexHelper scopedLock( pMutex );
731 Log *log = DefaultEnv::GetLog();
732 pSubStreams[subStream]->socket->Close();
733 time_t now = ::time(0);
734
735 //--------------------------------------------------------------------------
736 // For every connection error call the global connection error handler
737 //--------------------------------------------------------------------------
739
740 //--------------------------------------------------------------------------
741 // If we connected subStream == 0 and cannot connect >0 then we just give
742 // up and move the outgoing messages to another queue
743 //--------------------------------------------------------------------------
744 if( subStream > 0 )
745 {
746 pSubStreams[subStream]->status = Socket::Disconnected;
747 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
748 if( pSubStreams[0]->status == Socket::Connected )
749 {
750 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
751 if( !st.IsOK() )
752 OnFatalError( 0, st, scopedLock );
753 return;
754 }
755
756 if( pSubStreams[0]->status == Socket::Connecting )
757 return;
758
759 OnFatalError( subStream, status, scopedLock );
760 return;
761 }
762
763 //--------------------------------------------------------------------------
764 // Check if we still have time to try and do something in the current window
765 //--------------------------------------------------------------------------
766 time_t elapsed = now-pConnectionInitTime;
767 log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
768 pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
769
770 //------------------------------------------------------------------------
771 // If we have some IP addresses left we try them
772 //------------------------------------------------------------------------
773 if( !pAddresses.empty() )
774 {
775 XRootDStatus st;
776 do
777 {
778 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
779 pAddresses.pop_back();
780 pConnectionInitTime = ::time( 0 );
781 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
782 }
783 while( !pAddresses.empty() && !st.IsOK() );
784
785 if( !st.IsOK() )
786 OnFatalError( subStream, st, scopedLock );
787
788 return;
789 }
790 //------------------------------------------------------------------------
791 // If we still can retry with the same host name, we sleep until the end
792 // of the connection window and try
793 //------------------------------------------------------------------------
794 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
795 && !status.IsFatal() )
796 {
797 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
798 pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
799
800 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
801 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
802 return;
803 }
804 //--------------------------------------------------------------------------
805 // We are out of the connection window, the only thing we can do here
806 // is re-resolving the host name and retrying if we still can
807 //--------------------------------------------------------------------------
808 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
809 {
810 pAddresses.clear();
811 pSubStreams[0]->status = Socket::Disconnected;
812 PathID path( 0, 0 );
813 XRootDStatus st = EnableLink( path );
814 if( !st.IsOK() )
815 OnFatalError( subStream, st, scopedLock );
816 return;
817 }
818
819 //--------------------------------------------------------------------------
820 // Else, we fail
821 //--------------------------------------------------------------------------
822 OnFatalError( subStream, status, scopedLock );
823 }
824
825 //----------------------------------------------------------------------------
826 // Call back when an error has occurred
827 //----------------------------------------------------------------------------
828 void Stream::OnError( uint16_t subStream, XRootDStatus status )
829 {
830 XrdSysMutexHelper scopedLock( pMutex );
831 Log *log = DefaultEnv::GetLog();
832 pSubStreams[subStream]->socket->Close();
833 pSubStreams[subStream]->status = Socket::Disconnected;
834
835 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
836 pStreamName.c_str(), subStream, status.ToString().c_str() );
837
838 //--------------------------------------------------------------------------
839 // Reinsert the stuff that we have failed to sent
840 //--------------------------------------------------------------------------
841 if( pSubStreams[subStream]->outMsgHelper.msg )
842 {
843 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
844 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
845 h.stateful );
846 pIncomingQueue->RemoveMessageHandler(h.handler);
847 pSubStreams[subStream]->outMsgHelper.Reset();
848 }
849
850 //--------------------------------------------------------------------------
851 // Reinsert the receiving handler and reset any partially read partial
852 //--------------------------------------------------------------------------
853 if( pSubStreams[subStream]->inMsgHelper.handler )
854 {
855 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
856 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
857 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
858 if( xrdHandler ) xrdHandler->PartialReceived();
859 h.Reset();
860 }
861
862 //--------------------------------------------------------------------------
863 // We are dealing with an error of a peripheral stream. If we don't have
864 // anything to send don't bother recovering. Otherwise move the requests
865 // to stream 0 if possible.
866 //--------------------------------------------------------------------------
867 if( subStream > 0 )
868 {
869 if( pSubStreams[subStream]->outQueue->IsEmpty() )
870 return;
871
872 if( pSubStreams[0]->status != Socket::Disconnected )
873 {
874 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
875 if( pSubStreams[0]->status == Socket::Connected )
876 {
877 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
878 if( !st.IsOK() )
879 OnFatalError( 0, st, scopedLock );
880 return;
881 }
882 }
883 OnFatalError( subStream, status, scopedLock );
884 return;
885 }
886
887 //--------------------------------------------------------------------------
888 // If we lost the stream 0 we have lost the session, we re-enable the
889 // stream if we still have things in one of the outgoing queues, otherwise
890 // there is not point to recover at this point.
891 //--------------------------------------------------------------------------
892 if( subStream == 0 )
893 {
894 MonitorDisconnection( status );
895
896 SubStreamList::iterator it;
897 size_t outstanding = 0;
898 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
899 outstanding += (*it)->outQueue->GetSizeStateless();
900
901 if( outstanding )
902 {
903 PathID path( 0, 0 );
904 XRootDStatus st = EnableLink( path );
905 if( !st.IsOK() )
906 {
907 OnFatalError( 0, st, scopedLock );
908 return;
909 }
910 }
911
912 //------------------------------------------------------------------------
913 // We're done here, unlock the stream mutex to avoid deadlocks and
914 // report the disconnection event to the handlers
915 //------------------------------------------------------------------------
916 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
917 "message handlers.", pStreamName.c_str() );
918 OutQueue q;
919 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
920 q.GrabStateful( *(*it)->outQueue );
921 scopedLock.UnLock();
922
923 q.Report( status );
924 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
925 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
926 return;
927 }
928 }
929
930 //------------------------------------------------------------------------
931 // Force error
932 //------------------------------------------------------------------------
933 void Stream::ForceError( XRootDStatus status, bool hush )
934 {
935 XrdSysMutexHelper scopedLock( pMutex );
936 Log *log = DefaultEnv::GetLog();
937 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
938 {
939 if( pSubStreams[substream]->status != Socket::Connected ) continue;
940 pSubStreams[substream]->socket->Close();
941 pSubStreams[substream]->status = Socket::Disconnected;
942
943 if( !hush )
944 log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
945 pStreamName.c_str(), status.ToString().c_str() );
946
947 //--------------------------------------------------------------------
948 // Reinsert the stuff that we have failed to sent
949 //--------------------------------------------------------------------
950 if( pSubStreams[substream]->outMsgHelper.msg )
951 {
952 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
953 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
954 h.stateful );
955 pIncomingQueue->RemoveMessageHandler(h.handler);
956 pSubStreams[substream]->outMsgHelper.Reset();
957 }
958
959 //--------------------------------------------------------------------
960 // Reinsert the receiving handler and reset any partially read partial
961 //--------------------------------------------------------------------
962 if( pSubStreams[substream]->inMsgHelper.handler )
963 {
964 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
965 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
966 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
967 if( xrdHandler ) xrdHandler->PartialReceived();
968 h.Reset();
969 }
970 }
971
972 pConnectionCount = 0;
973
974 //------------------------------------------------------------------------
975 // We're done here, unlock the stream mutex to avoid deadlocks and
976 // report the disconnection event to the handlers
977 //------------------------------------------------------------------------
978 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
979 "message handlers.", pStreamName.c_str() );
980
981 SubStreamList::iterator it;
982 OutQueue q;
983 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
984 q.GrabItems( *(*it)->outQueue );
985 scopedLock.UnLock();
986
987 q.Report( status );
988
989 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
990 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
991 }
992
993 //----------------------------------------------------------------------------
994 // On fatal error
995 //----------------------------------------------------------------------------
996 void Stream::OnFatalError( uint16_t subStream,
997 XRootDStatus status,
998 XrdSysMutexHelper &lock )
999 {
1000 Log *log = DefaultEnv::GetLog();
1001 pSubStreams[subStream]->status = Socket::Disconnected;
1002 log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
1003 pStreamName.c_str(), status.ToString().c_str() );
1004
1005 //--------------------------------------------------------------------------
1006 // Don't set the stream error windows for authentication errors as the user
1007 // may refresh his credential at any time
1008 //--------------------------------------------------------------------------
1009 if( status.code != errAuthFailed )
1010 {
1011 pConnectionCount = 0;
1012 pLastStreamError = ::time(0);
1013 pLastFatalError = status;
1014 }
1015
1016 SubStreamList::iterator it;
1017 OutQueue q;
1018 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1019 q.GrabItems( *(*it)->outQueue );
1020 lock.UnLock();
1021
1022 status.status = stFatal;
1023 q.Report( status );
1024 pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1025 pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1026
1027 }
1028
1029 //----------------------------------------------------------------------------
1030 // Inform monitoring about disconnection
1031 //----------------------------------------------------------------------------
1032 void Stream::MonitorDisconnection( XRootDStatus status )
1033 {
1034 Monitor *mon = DefaultEnv::GetMonitor();
1035 if( mon )
1036 {
1037 Monitor::DisconnectInfo i;
1038 i.server = pUrl->GetHostId();
1039 i.rBytes = pBytesReceived;
1040 i.sBytes = pBytesSent;
1041 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1042 i.status = status;
1043 mon->Event( Monitor::EvDisconnect, &i );
1044 }
1045 }
1046
1047 //----------------------------------------------------------------------------
1048 // Call back when a message has been reconstructed
1049 //----------------------------------------------------------------------------
1050 bool Stream::OnReadTimeout( uint16_t substream )
1051 {
1052 //--------------------------------------------------------------------------
1053 // We only take the main stream into account
1054 //--------------------------------------------------------------------------
1055 if( substream != 0 )
1056 return true;
1057
1058 //--------------------------------------------------------------------------
1059 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1060 // It is assumed that the underlying transport makes sure that there is no
1061 // pending requests that are not answered, ie. all possible virtual streams
1062 // are de-allocated
1063 //--------------------------------------------------------------------------
1064 Log *log = DefaultEnv::GetLog();
1065 SubStreamList::iterator it;
1066 time_t now = time(0);
1067
1068 XrdSysMutexHelper scopedLock( pMutex );
1069 uint32_t outgoingMessages = 0;
1070 time_t lastActivity = 0;
1071 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1072 {
1073 outgoingMessages += (*it)->outQueue->GetSize();
1074 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1075 if( lastActivity < sockLastActivity )
1076 lastActivity = sockLastActivity;
1077 }
1078
1079 if( !outgoingMessages )
1080 {
1081 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1082 *pChannelData );
1083 if( disconnect )
1084 {
1085 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1086 pStreamName.c_str() );
1087 scopedLock.UnLock();
1088 //----------------------------------------------------------------------
1089 // Important note!
1090 //
1091 // This destroys the Stream object itself, the underlined
1092 // AsyncSocketHandler object (that called this method) and the Channel
1093 // object that aggregates this Stream.
1094 //
1095 // Additionally &(*pUrl) is used by ForceDisconnect to check if we are
1096 // in a Channel that was previously collapsed in a redirect.
1097 //----------------------------------------------------------------------
1099 return false;
1100 }
1101 }
1102
1103 //--------------------------------------------------------------------------
1104 // Check if the stream is broken
1105 //--------------------------------------------------------------------------
1106 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1107 *pChannelData );
1108 if( !st.IsOK() )
1109 {
1110 scopedLock.UnLock();
1111 OnError( substream, st );
1112 return false;
1113 }
1114 return true;
1115 }
1116
1117 //----------------------------------------------------------------------------
1118 // Call back when a message has been reconstru
1119 //----------------------------------------------------------------------------
1120 bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1121 {
1122 return true;
1123 }
1124
1125 //----------------------------------------------------------------------------
1126 // Register channel event handler
1127 //----------------------------------------------------------------------------
1129 {
1130 pChannelEvHandlers.AddHandler( handler );
1131 }
1132
1133 //----------------------------------------------------------------------------
1134 // Remove a channel event handler
1135 //----------------------------------------------------------------------------
1137 {
1138 pChannelEvHandlers.RemoveHandler( handler );
1139 }
1140
1141 //----------------------------------------------------------------------------
1142 // Install a incoming message handler
1143 //----------------------------------------------------------------------------
1144 MsgHandler*
1145 Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1146 {
1147 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1148 if( !mh.handler )
1149 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1150 mh.expires,
1151 mh.action );
1152
1153 if( !mh.handler )
1154 return nullptr;
1155
1156 if( mh.action & MsgHandler::Raw )
1157 return mh.handler;
1158 return nullptr;
1159 }
1160
1161 //----------------------------------------------------------------------------
1165 //----------------------------------------------------------------------------
1166 uint16_t Stream::InspectStatusRsp( uint16_t stream,
1167 MsgHandler *&incHandler )
1168 {
1169 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1170 if( !mh.handler )
1172
1173 uint16_t action = mh.handler->InspectStatusRsp();
1174 mh.action |= action;
1175
1176 if( action & MsgHandler::RemoveHandler )
1177 pIncomingQueue->RemoveMessageHandler( mh.handler );
1178
1179 if( action & MsgHandler::Raw )
1180 {
1181 incHandler = mh.handler;
1182 return MsgHandler::Raw;
1183 }
1184
1185 if( action & MsgHandler::Corrupted )
1186 return MsgHandler::Corrupted;
1187
1188 if( action & MsgHandler::More )
1189 return MsgHandler::More;
1190
1191 return MsgHandler::None;
1192 }
1193
1194 //----------------------------------------------------------------------------
1195 // Check if channel can be collapsed using given URL
1196 //----------------------------------------------------------------------------
1197 bool Stream::CanCollapse( const URL &url )
1198 {
1199 Log *log = DefaultEnv::GetLog();
1200
1201 //--------------------------------------------------------------------------
1202 // Resolve all the addresses of the host we're supposed to connect to
1203 //--------------------------------------------------------------------------
1204 std::vector<XrdNetAddr> prefaddrs;
1205 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1206 if( !st.IsOK() )
1207 {
1208 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1209 , pStreamName.c_str(), url.GetHostName().c_str() );
1210 return false;
1211 }
1212
1213 //--------------------------------------------------------------------------
1214 // Resolve all the addresses of the alias
1215 //--------------------------------------------------------------------------
1216 std::vector<XrdNetAddr> aliasaddrs;
1217 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1218 if( !st.IsOK() )
1219 {
1220 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1221 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1222 return false;
1223 }
1224
1225 //--------------------------------------------------------------------------
1226 // Now check if the preferred host is part of the alias
1227 //--------------------------------------------------------------------------
1228 auto itr = prefaddrs.begin();
1229 for( ; itr != prefaddrs.end() ; ++itr )
1230 {
1231 auto itr2 = aliasaddrs.begin();
1232 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1233 if( itr->Same( &*itr2 ) ) return true;
1234 }
1235
1236 return false;
1237 }
1238
1239 //------------------------------------------------------------------------
1240 // Query the stream
1241 //------------------------------------------------------------------------
1242 Status Stream::Query( uint16_t query, AnyObject &result )
1243 {
1244 switch( query )
1245 {
1247 {
1248 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1249 return Status();
1250 }
1251
1253 {
1254 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1255 return Status();
1256 }
1257
1259 {
1260 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1261 return Status();
1262 }
1263
1264 default:
1266 }
1267 }
1268
1269}
union ServerResponse::@0 body
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 requestid
Definition XProtocol.hh:228
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
struct ServerResponseBody_Status bdy
kXR_char fhandle[4]
Definition XProtocol.hh:229
@ kXR_close
Definition XProtocol.hh:115
ServerResponseHeader hdr
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
void AssignTimeout(MsgHandler *handler)
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
A network socket.
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void Tick(time_t now)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
Random utilities.
Definition XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
const uint16_t errAuthFailed
@ kXR_PartialResult
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.