XRootD
Loading...
Searching...
No Matches
XrdSsiSessReal.cc File Reference
#include <cerrno>
#include <cinttypes>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
#include <sys/types.h>
#include <netinet/in.h>
#include "XrdSsi/XrdSsiAtomics.hh"
#include "XrdSsi/XrdSsiRequest.hh"
#include "XrdSsi/XrdSsiRRAgent.hh"
#include "XrdSsi/XrdSsiRRInfo.hh"
#include "XrdSsi/XrdSsiScale.hh"
#include "XrdSsi/XrdSsiServReal.hh"
#include "XrdSsi/XrdSsiSessReal.hh"
#include "XrdSsi/XrdSsiTaskReal.hh"
#include "XrdSsi/XrdSsiTrace.hh"
#include "XrdSsi/XrdSsiUtils.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysHeaders.hh"
#include "Xrd/XrdScheduler.hh"
+ Include dependency graph for XrdSsiSessReal.cc:

Go to the source code of this file.

Namespaces

namespace  XrdSsi
 

Macros

#define INSERT(dlvar, curitem, newitem)
 
#define REMOVE(dlbase, dlvar, curitem)
 
#define SINGLETON(dlvar, theitem)    theitem ->dlvar .next == theitem
 

Macro Definition Documentation

◆ INSERT

#define INSERT (   dlvar,
  curitem,
  newitem 
)
Value:
newitem ->dlvar .next = curitem; \
newitem ->dlvar .prev = curitem ->dlvar .prev; \
curitem ->dlvar .prev-> dlvar .next = newitem; \
curitem ->dlvar .prev = newitem

Definition at line 63 of file XrdSsiSessReal.cc.

70 : curitem ->dlvar .next);\
71 curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
72 curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
73 curitem ->dlvar .next = curitem;\
74 curitem ->dlvar .prev = curitem
75
76/******************************************************************************/
77/* L o c a l S t a t i c s */
78/******************************************************************************/
79
80namespace
81{
82 std::string dsProperty("DataServer");
83 XrdSsiMutex sidMutex;
84
85 Atomic(uint32_t) sidVal(0);
86}
87
88/******************************************************************************/
89/* G l o b a l s */
90/******************************************************************************/
91
92namespace XrdSsi
93{
94extern XrdScheduler *schedP;
95
96extern XrdSysError Log;
98}
99
100/******************************************************************************/
101/* L o c a l C l a s s e s */
102/******************************************************************************/
103
104namespace
105{
106class CleanUp : public XrdJob
107{
108public:
109
110void DoIt() {sessP->Lock();
111 sessP->Unprovision();
112 delete this;
113 }
114
115 CleanUp(XrdSsiSessReal *sP) : sessP(sP) {}
116 ~CleanUp() {}
117
118private:
119XrdSsiSessReal *sessP;
120};
121}
122
123/******************************************************************************/
124/* D e s t r u c t o r */
125/******************************************************************************/
126
128{
129 XrdSsiTaskReal *tP;
130
131 if (resKey) free(resKey);
132 if (sessName) free(sessName);
133 if (sessNode) free(sessNode);
134
135 while((tP = freeTask)) {freeTask = tP->attList.next; delete tP;}
136}
137
138/******************************************************************************/
139/* I n i t S e s s i o n */
140/******************************************************************************/
141
142void XrdSsiSessReal::InitSession(XrdSsiServReal *servP, const char *sName,
143 int uent, bool hold, bool newSID)
144{
145 EPNAME("InitSession");
146 requestP = 0;
147 uEnt = uent;
148 attBase = 0;
149 freeTask = 0;
150 myService = servP;
151 nextTID = 0;
152 alocLeft = XrdSsiRRInfo::idMax;
153 isHeld = hold;
154 inOpen = false;
155 noReuse = false;
156 if (resKey) {free(resKey); resKey = 0;}
157 if (sessName) free(sessName);
158 sessName = (sName ? strdup(sName) : 0);
159 if (sessNode) free(sessNode);
160 sessNode = 0;
161 if (newSID)
162 {if (servP == 0) sessID = 0xffffffff;
163 else {Atomic_BEG(sidMutex);
164 sessID = Atomic_INC(sidVal);
165 Atomic_END(sidMutex);
166 snprintf(tident, sizeof(tident), "S %u#", sessID);
167 DEBUG("new sess for "<<sName<<" uent="<<uent<<" hold="<<hold);
168 }
169 } else {
170 DEBUG("reuse sess for "<<sName<<" uent="<<uent<<" hold="<<hold);
171 }
172}
173
174/******************************************************************************/
175/* Private: N e w T a s k */
176/******************************************************************************/
177
178// Must be called with sessMutex locked!
179
180XrdSsiTaskReal *XrdSsiSessReal::NewTask(XrdSsiRequest *reqP)
181{
182 EPNAME("NewTask");
183 XrdSsiTaskReal *ptP, *tP;
184
185// Allocate a task object for this request
186//
187 if ((tP = freeTask)) freeTask = tP->attList.next;
188 else {if (!alocLeft || !(tP = new XrdSsiTaskReal(this)))
189 {XrdSsiUtils::RetErr(*reqP, "Too many active requests.", EMLINK);
190 return 0;
191 }
192 alocLeft--;
193 }
194
195// We always set a new task ID to avoid ID collisions. This is good for over
196// 194 days if we have 1 request/second. In practice. this will work for a
197// couple of years before wrapping. By then the ID's should be free.
198//
199 tP->SetTaskID(nextTID++, sessID);
200 nextTID &= XrdSsiRRInfo::idMax;
201
202// Initialize the task and return its pointer
203//
204 tP->Init(reqP, reqP->GetTimeOut());
205 DEBUG("New task=" <<tP <<" id=" <<tP->ID());
206
207// Insert the task into our list of tasks
208//
209 if ((ptP = attBase)) {INSERT(attList, ptP, tP);}
210 else attBase = tP;
211
212// We will be using the session mutex for serialization. Afterwards, bind the
213// task to the request and return the task pointer.
214//
215 XrdSsiRRAgent::SetMutex(reqP, &sessMutex);
216 tP->BindRequest(*reqP);
217 return tP;
218}
219
220/******************************************************************************/
221/* P r o v i s i o n */
222/******************************************************************************/
223
224bool XrdSsiSessReal::Provision(XrdSsiRequest *reqP, const char *epURL)
225{
226 EPNAME("Provision");
227 XrdCl::XRootDStatus epStatus;
228 XrdSsiMutexMon rHelp(&sessMutex);
230
231// Set retry flag as appropriate
232//
233 if (XrdSsiRRAgent::isaRetry(reqP, true)) oFlags |= XrdCl::OpenFlags::Refresh;
234
235// Issue the open and if the open was started, return success.
236//
237 DEBUG("Provisioning " <<epURL);
238 epStatus = epFile.Open((const std::string)epURL, oFlags,
241 reqP->GetTimeOut());
242
243// If there was an error, scuttle the request. Note that errors will be returned
244// on a separate thread to avoid hangs here.
245//
246 if (!epStatus.IsOK())
247 {std::string eTxt;
248 int eNum = XrdSsiUtils::GetErr(epStatus, eTxt);
249 XrdSsiUtils::RetErr(*reqP, eTxt.c_str(), eNum);
251 return false;
252 }
253
254// Queue a new task and indicate our state
255//
256 NewTask(reqP);
257 inOpen = true;
258 return true;
259}
260
261/******************************************************************************/
262/* Private: R e l T a s k */
263/******************************************************************************/
264
265void XrdSsiSessReal::RelTask(XrdSsiTaskReal *tP) // sessMutex locked!
266{
267 EPNAME("RelTask");
268
269// Do some debugging here
270//
271 DEBUG((isHeld ? "Recycling":"Deleting")<<" task="<<tP<<" id=" <<tP->ID());
272
273// Delete this task or place it on the free list
274//
275 if (!isHeld) delete tP;
276 else {tP->ClrEvent();
277 tP->attList.next = freeTask;
278 freeTask = tP;
279 }
280}
281
282/******************************************************************************/
283/* R u n */
284/******************************************************************************/
285
287{
288 XrdSsiMutexMon sessMon(sessMutex);
289 XrdSsiTaskReal *tP;
290
291// If we are not allowed to be reused, return to indicated try someone else
292//
293 if (noReuse) return false;
294
295// Reserve a stream ID. If we cannot then indicate we cannot be reused
296//
297 if (!XrdSsi::sidScale.rsvEnt(uEnt)) return false;
298
299// Queue a new task
300//
301 tP = NewTask(reqP);
302
303// If we are already open and we have a task, send off the request
304//
305 if (!inOpen && tP && !tP->SendRequest(sessNode)) noReuse = true;
306 return true;
307}
308
309/******************************************************************************/
310/* Private: S h u t d o w n */
311/******************************************************************************/
312
313// Called with sessMutex locked and return with it unlocked
314
315void XrdSsiSessReal::Shutdown(XrdCl::XRootDStatus &epStatus, bool onClose)
316{
317 XrdSsiTaskReal *tP, *ntP = freeTask;
318
319// Delete all acccumulated tasks
320//
321 while((tP = ntP)) {ntP = tP->attList.next; delete tP;}
322 freeTask = 0;
323
324// If the close failed then we cannot recycle this object as it is not reusable
325//
326 if (onClose && !epStatus.IsOK())
327 {std::string eText;
328 int eNum = XrdSsiUtils::GetErr(epStatus, eText);
329 char mBuff[1024];
330 snprintf(mBuff, sizeof(mBuff), "Unprovision: %s@%s error; %d",
331 sessName, sessNode, eNum);
332 Log.Emsg("Shutdown", mBuff, eText.c_str());
333 sessMutex.UnLock();
334 myService->Recycle(this, false);
335 } else {
336 if (sessName) {free(sessName); sessName = 0;}
337 if (sessNode) {free(sessNode); sessNode = 0;}
338 sessMutex.UnLock();
339 myService->Recycle(this, !noReuse);
340 }
341}
342
343/******************************************************************************/
344/* T a s k F i n i s h e d */
345/******************************************************************************/
346
348{
349 EPNAME("TaskFin");
350// Lock our mutex
351//
352 sessMutex.Lock();
353
354// Remove task from the task list if it's in it and release the task object.
355//
356 if (tP == attBase || tP->attList.next != tP)
357 {REMOVE(attBase, attList, tP);}
358 RelTask(tP);
359
360// Return the request entry number
361//
363
364// If we are waiting for a provision to finish, simply exit as the event
365// handler will notice that there is no task and will unprovision. Otherwise
366//
367
368
369// If we can shutdown, then unprovision which will drive a shutdown. Note
370// that Unprovision() returns without the sessMutex, otherwise we must
371// unlock it before we return. A shutdown invalidates this object!
372//
373 if (!inOpen)
374 {if (!isHeld && !attBase) Unprovision();
375 else sessMutex.UnLock();
376 } else {
377 DEBUG("Unprovision deferred for " <<sessName);
378 sessMutex.UnLock();
379 }
380}
381
382/******************************************************************************/
383/* U n H o l d */
384/******************************************************************************/
385
386void XrdSsiSessReal::UnHold(bool cleanup)
387{
388 XrdSsiMutexMon sessMon(sessMutex);
389
390// Immediately stopo reuse of this object
391//
392 if (isHeld && resKey && myService) myService->StopReuse(resKey);
393
394// Turn off the hold flag and if we have no attached tasks, schedule shutdown
395//
396 isHeld = false;
397 if (cleanup && !attBase) XrdSsi::schedP->Schedule(new CleanUp(this));
398}
399
400/******************************************************************************/
401/* Private: U n p r o v i s i o n */
402/******************************************************************************/
403
404// Called with sessMutex locked and returns with it unlocked
405// Returns false if a shutdown occurred (i.e. session object no longer valid)
406
407bool XrdSsiSessReal::Unprovision() // Called with sessMutex locked!
408{
409 EPNAME("Unprovision");
411
412// Clear any pending events
413//
414 DEBUG("Closing " <<sessName);
415
416// If the file is not open (it might be due to an open error) then do a
417// shutdown right away. Otherwise, try to close if successful the event
418// handler will do the shutdown, Otherwise, we do a Futterwacken dance.
419//
420 if (!epFile.IsOpen()) {Shutdown(uStat, false); return false;}
421 else {uStat = epFile.Close((XrdCl::ResponseHandler *)this);
422 if (!uStat.IsOK()) {Shutdown(uStat, true); return false;}
423 else sessMutex.UnLock();
424 }
425 return true;
426}
427
428/******************************************************************************/
429/* X e q E v e n t */
430/******************************************************************************/
431
433 XrdCl::AnyObject **respP)
434{
435// Lock out mutex. Note that events like shutdown unlock the mutex. The only
436// events handled here are open() and close().
437//
438 sessMutex.Lock();
439 XrdSsiTaskReal *ztP, *ntP, *tP = attBase;
440
441// If we are not in the open phase then this is due to a close event. Simply
442// do a shutdown and return to stop event processing.
443//
444 if (!inOpen)
445 {Shutdown(*status, true); // sessMutex gets unlocked!
446 return -1; // This object no longer valid!
447 }
448
449// We are no longer in open. However, if open encounetered an error then this
450// session cannot be reused because the file object is in a bad state.
451//
452 inOpen = false;
453 noReuse = !status->IsOK();
454
455// If we have no requests then we may want to simply shoutdown.
456// Note that shutdown and unprovision unlock the sessMutex.
457//
458 if (!tP)
459 {if (isHeld)
460 {sessMutex.UnLock();
461 return 1;
462 }
463 if (!status->IsOK()) {Shutdown(*status, false); return -1;}
464 else {if (!isHeld) return (Unprovision() ? 1 : -1);
465 else sessMutex.UnLock();
466 }
467 return 1; // Flush events and continue
468 }
469
470// We are here because the open finally completed. If the open failed, then
471// schedule an error for all pending tasks. The Finish() call on each will
472// drive the cleanup of this session.
473//
474 if (!status->IsOK())
475 {XrdSsiErrInfo eInfo;
476 XrdSsiUtils::SetErr(*status, eInfo);
477 do {tP->SchedError(&eInfo); tP = tP->attList.next;}
478 while(tP != attBase);
479 sessMutex.UnLock();
480 return -1; // Halt processing as this object may now be deleted
481 }
482
483// Obtain the endpoint name
484//
485 std::string currNode;
486 if (epFile.GetProperty(dsProperty, currNode))
487 {if (sessNode) free(sessNode);
488 sessNode = strdup(currNode.c_str());
489 } else sessNode = strdup("Unknown!");
490
491// Execute each pending request. Make sure not to reference the task object
492// chain pointer after invoking SendRequest() as it may become invalid.
493//
494 ztP = attBase;
495 do {ntP = tP->attList.next;
496 if (!tP->SendRequest(sessNode)) noReuse = true;
497 tP = ntP;
498 } while(tP != ztP);
499
500// We are done, field the next event
501//
502 sessMutex.UnLock();
503 return 0;
504}
#define DEBUG(x)
#define EPNAME(x)
#define Atomic(type)
#define Atomic_INC(x)
#define Atomic_END(x)
#define Atomic_BEG(x)
#define REMOVE(dlbase, dlvar, curitem)
#define INSERT(dlvar, curitem, newitem)
#define ID
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:151
bool IsOpen() const
Check if the file is open.
Definition XrdClFile.cc:846
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:99
bool GetProperty(const std::string &name, std::string &value) const
Definition XrdClFile.cc:878
Handle an async response.
virtual void DoIt()=0
void Schedule(XrdJob *jp)
void ClrEvent()
char tident[24]
static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
static bool isaRetry(XrdSsiRequest *rP, bool reset=false)
static const unsigned int idMax
uint16_t GetTimeOut()
void BindRequest(XrdSsiRequest &rqstR)
void retEnt(int xEnt)
void StopReuse(const char *resKey)
void Recycle(XrdSsiSessReal *sObj, bool reuse)
void InitSession(XrdSsiServReal *servP, const char *sName, int uent, bool hold, bool newSID=false)
bool Provision(XrdSsiRequest *reqP, const char *epURL)
bool Run(XrdSsiRequest *reqP)
int XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP)
XrdCl::File epFile
void UnHold(bool cleanup=true)
void TaskFinished(XrdSsiTaskReal *tP)
void Init(XrdSsiRequest *rP, unsigned short tmo=0)
XrdSsiTaskReal * next
void SchedError(XrdSsiErrInfo *eInfo=0)
bool SendRequest(const char *node)
void SetTaskID(uint32_t tid, uint32_t sid)
static int GetErr(XrdCl::XRootDStatus &Status, std::string &eText)
static void RetErr(XrdSsiRequest &reqP, const char *eTxt, int eNum)
static void SetErr(XrdCl::XRootDStatus &Status, XrdSsiErrInfo &eInfo)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdScheduler * schedP
XrdSsiScale sidScale
XrdSysError Log
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
bool IsOK() const
We're fine.

◆ REMOVE

#define REMOVE (   dlbase,
  dlvar,
  curitem 
)
Value:
if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
? 0 : curitem ->dlvar .next);\
curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
curitem ->dlvar .next = curitem;\
curitem ->dlvar .prev = curitem
#define SINGLETON(dlvar, theitem)

Definition at line 69 of file XrdSsiSessReal.cc.

71 : curitem ->dlvar .next);\
72 curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
73 curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
74 curitem ->dlvar .next = curitem;\
75 curitem ->dlvar .prev = curitem

◆ SINGLETON

#define SINGLETON (   dlvar,
  theitem 
)     theitem ->dlvar .next == theitem

Definition at line 60 of file XrdSsiSessReal.cc.