XRootD
Loading...
Searching...
No Matches
XrdFrcReqFile.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d F r c R e q F i l e . c c */
4/* */
5/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32#include <strings.h>
33#include <cstdio>
34#include <fcntl.h>
35#include <unistd.h>
36#include <cerrno>
37#include <sys/param.h>
38#include <sys/types.h>
39#include <sys/stat.h>
40
41#include "XrdFrc/XrdFrcCID.hh"
43#include "XrdFrc/XrdFrcTrace.hh"
44#include "XrdSys/XrdSysError.hh"
45#include "XrdSys/XrdSysFD.hh"
47
48using namespace XrdFrc;
49
50/******************************************************************************/
51/* S t a t i c V a r i a b l e s */
52/******************************************************************************/
53
54XrdSysMutex XrdFrcReqFile::rqMonitor::rqMutex;
55
56/******************************************************************************/
57/* C o n s t r u c t o r */
58/******************************************************************************/
59
60XrdFrcReqFile::XrdFrcReqFile(const char *fn, int aVal)
61{
62 char buff[1200];
63
64 memset((void *)&HdrData, 0, sizeof(HdrData));
65 reqFN = strdup(fn);
66 strcpy(buff, fn); strcat(buff, ".lock");
67 lokFN = strdup(buff);
68 lokFD = reqFD = -1;
69 isAgent = aVal;
70}
71
72/******************************************************************************/
73/* A d d */
74/******************************************************************************/
75
77{
78 rqMonitor rqMon(isAgent);
79 XrdFrcRequest tmpReq;
80 int fP;
81
82// Lock the file
83//
84 if (!FileLock()) {FailAdd(rP->LFN, 0); return;}
85
86// Obtain a free slot
87//
88 if ((fP = HdrData.Free))
89 {if (!reqRead((void *)&tmpReq, fP)) {FailAdd(rP->LFN, 1); return;}
90 HdrData.Free = tmpReq.Next;
91 } else {
92 struct stat buf;
93 if (fstat(reqFD, &buf))
94 {Say.Emsg("Add",errno,"stat",reqFN); FailAdd(rP->LFN, 1); return;}
95 fP = buf.st_size;
96 }
97
98// Chain in the request (registration requests go fifo)
99//
101 {if (!(rP->Next = HdrData.First)) HdrData.Last = fP;
102 HdrData.First = fP;
103 } else {
104 if (HdrData.First && HdrData.Last)
105 {if (!reqRead((void *)&tmpReq, HdrData.Last))
106 {FailAdd(rP->LFN, 1); return;}
107 tmpReq.Next = fP;
108 if (!reqWrite((void *)&tmpReq, HdrData.Last, 0))
109 {FailAdd(rP->LFN, 1); return;}
110 } else HdrData.First = fP;
111 HdrData.Last = fP; rP->Next = 0;
112 }
113
114// Write out the file
115//
116 rP->This = fP;
117 if (!reqWrite(rP, fP)) FailAdd(rP->LFN, 0);
118 FileLock(lkNone);
119}
120
121/******************************************************************************/
122/* C a n */
123/******************************************************************************/
124
126{
127 rqMonitor rqMon(isAgent);
128 XrdFrcRequest tmpReq;
129 int Offs, numCan = 0, numBad = 0;
130 struct stat buf;
131 char txt[128];
132
133// Lock the file and get its size
134//
135 if (!FileLock() || fstat(reqFD, &buf)) {FailCan(rP->ID, 0); return;}
136
137// Run through all of the file entries removing matching requests
138//
139 for (Offs = ReqSize; Offs < buf.st_size; Offs += ReqSize)
140 {if (!reqRead((void *)&tmpReq, Offs)) return FailCan(rP->ID);
141 if (!strcmp(tmpReq.ID, rP->ID))
142 {tmpReq.LFN[0] = '\0';
143 if (!reqWrite((void *)&tmpReq, Offs, 0)) numBad++;
144 else numCan++;
145 }
146 }
147
148// Make sure this is written to disk
149//
150 if (numCan) fsync(reqFD);
151
152// Document the action
153//
154 if (numCan || numBad)
155 {sprintf(txt, "has %d entries; %d removed (%d failures).",
156 numCan+numBad, numCan, numBad);
157 Say.Emsg("Can", rP->ID, txt);
158 }
159 FileLock(lkNone);
160}
161
162/******************************************************************************/
163/* D e l */
164/******************************************************************************/
165
167{
168 rqMonitor rqMon(isAgent);
169 XrdFrcRequest tmpReq;
170
171// Lock the file
172//
173 if (!FileLock()) {FailDel(rP->LFN, 0); return;}
174
175// Put entry on the free chain
176//
177 memset(&tmpReq, 0, sizeof(tmpReq));
178 tmpReq.Next = HdrData.Free;
179 HdrData.Free = rP->This;
180 if (!reqWrite((void *)&tmpReq, rP->This)) FailDel(rP->LFN, 0);
181 FileLock(lkNone);
182}
183
184/******************************************************************************/
185/* G e t */
186/******************************************************************************/
187
189{
190 int fP, rc;
191
192// Lock the file
193//
194 if (!FileLock()) return 0;
195
196// Get the next request
197//
198 while((fP = HdrData.First))
199 {if (!reqRead((void *)rP, fP)) {FileLock(lkNone); return 0;}
200 HdrData.First= rP->Next;
201 if (*(rP->LFN)) {reqWrite(0,0,1); break;}
202 rP->Next = HdrData.Free;
203 HdrData.Free = fP;
204 if (!reqWrite(rP, fP)) {fP = 0; break;}
205 }
206 if (fP) rc = (HdrData.First ? 1 : -1);
207 else rc = 0;
208 FileLock(lkNone);
209 return rc;
210}
211
212/******************************************************************************/
213/* I n i t */
214/******************************************************************************/
215
217{
218 EPNAME("Init");
219 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
220 XrdFrcRequest tmpReq;
221 struct stat buf;
222 recEnt *RegList = 0, *First = 0, *rP, *pP, *tP;
223 int Offs, rc, numreq = 0;
224
225// Open the lock file first in r/w mode
226//
227 if ((lokFD = XrdSysFD_Open(lokFN, O_RDWR|O_CREAT, Mode)) < 0)
228 {Say.Emsg("Init",errno,"open",lokFN); return 0;}
229
230// Obtain a lock
231//
232 if (!FileLock(lkInit)) return 0;
233
234// Open the file first in r/w mode
235//
236 if ((reqFD = XrdSysFD_Open(reqFN, O_RDWR|O_CREAT, Mode)) < 0)
237 {FileLock(lkNone);
238 Say.Emsg("Init",errno,"open",reqFN);
239 return 0;
240 }
241
242// Check for a new file here
243//
244 if (fstat(reqFD, &buf)) return FailIni("stat");
245 if (buf.st_size < ReqSize)
246 {memset(&tmpReq, 0, sizeof(tmpReq));
247 HdrData.Free = ReqSize;
248 if (!reqWrite((void *)&tmpReq, ReqSize)) return FailIni("init file");
249 FileLock(lkNone);
250 return 1;
251 }
252
253// We are done if this is a agent
254//
255 if (isAgent)
256 {FileLock(lkNone);
257 return 1;
258 }
259
260// Read the full file
261//
262 for (Offs = ReqSize; Offs < buf.st_size; Offs += ReqSize)
263 {if (!reqRead((void *)&tmpReq, Offs)) return FailIni("read file");
264 if (*tmpReq.LFN == '\0' || !tmpReq.addTOD
265 || tmpReq.Opaque >= int(sizeof(tmpReq.LFN))) continue;
266 pP = 0; rP = First; tP = new recEnt(tmpReq); numreq++;
267 if (tmpReq.Options & XrdFrcRequest::Register)
268 {tP->Next = RegList; RegList = tP;
269 } else {
270 while(rP && rP->reqData.addTOD < tmpReq.addTOD) {pP=rP;rP=rP->Next;}
271 if (pP) pP->Next = tP;
272 else First = tP;
273 tP->Next = rP;
274 }
275 }
276
277// Plase registration requests in the front
278//
279 while((rP = RegList))
280 {RegList = rP->Next;
281 rP->Next = First;
282 First = rP;
283 }
284
285// Now write out the file
286//
287 DEBUG(numreq <<" request(s) recovered from " <<reqFN);
288 rc = ReWrite(First);
289
290// Delete all the entries in memory while referencing known instance names
291//
292 while((tP = First))
293 {First = tP->Next;
294 CID.Ref(tP->reqData.iName);
295 delete tP;
296 }
297
298// All done
299//
300 FileLock(lkNone);
301 return rc;
302}
303
304/******************************************************************************/
305/* L i s t */
306/******************************************************************************/
307
308char *XrdFrcReqFile::List(char *Buff, int bsz, int &Offs,
309 XrdFrcRequest::Item *ITList, int ITNum)
310{
311 rqMonitor rqMon(isAgent);
312 XrdFrcRequest tmpReq;
313 int rc;
314
315// Set Offs argument
316//
317 if (Offs < ReqSize) Offs = ReqSize;
318
319// Lock the file
320//
321 if (!FileLock(lkShare)) return 0;
322
323// Return next valid filename
324//
325 do{do {rc = pread(reqFD, (void *)&tmpReq, ReqSize, Offs);}
326 while(rc < 0 && errno == EINTR);
327 if (rc == ReqSize)
328 {Offs += ReqSize;
329 if (*tmpReq.LFN == '\0' || !tmpReq.addTOD
330 || tmpReq.Opaque >= int(sizeof(tmpReq.LFN))
331 || tmpReq.Options & XrdFrcRequest::Register) continue;
332 FileLock(lkNone);
333 if (!ITNum || !ITList) strlcpy(Buff, tmpReq.LFN, bsz);
334 else ListL(tmpReq, Buff, bsz, ITList, ITNum);
335 return Buff;
336 }
337 } while(rc == ReqSize);
338
339// Diagnose ending condition
340//
341 if (rc < 0) Say.Emsg("List",errno,"read",reqFN);
342
343// Return end of list
344//
345 FileLock(lkNone);
346 return 0;
347}
348
349/******************************************************************************/
350/* L i s t L */
351/******************************************************************************/
352
353void XrdFrcReqFile::ListL(XrdFrcRequest &tmpReq, char *Buff, int bsz,
354 XrdFrcRequest::Item *ITList, int ITNum)
355{
356 char What, tbuf[32];
357 long long tval;
358 int i, k, n, bln = bsz-2, Lfo;
359
360 for (i = 0; i < ITNum && bln > 0; i++)
361 {Lfo = tmpReq.LFO;
362 switch(ITList[i])
364 Lfo = 0;
365 n = strlen(tmpReq.LFN);
366 strlcpy(Buff, tmpReq.LFN, bln);
367 break;
368
370 n = strlen(tmpReq.LFN+Lfo);
371 strlcpy(Buff, tmpReq.LFN+Lfo, bln);
372 break;
373
375 Lfo = 0;
376 n = strlen(tmpReq.LFN); tmpReq.LFN[n] = '?';
377 if (!tmpReq.Opaque) tmpReq.LFN[n+1] = '\0';
378 strlcpy(Buff, tmpReq.LFN, bln);
379 k = strlen(tmpReq.LFN);
380 tmpReq.LFN[n] = '\0'; n = k;
381 break;
382
384 n = strlen(tmpReq.LFN); tmpReq.LFN[n] = '?';
385 if (!tmpReq.Opaque) tmpReq.LFN[n+1] = '\0';
386 strlcpy(Buff, tmpReq.LFN+Lfo, bln);
387 k = strlen(tmpReq.LFN+Lfo);
388 tmpReq.LFN[n] = '\0'; n = k;
389 break;
390
392 n = 0;
393 What = (tmpReq.Options & XrdFrcRequest::makeRW
394 ? 'w' : 'r');
395 if (bln) {Buff[n] = What; n++;}
396 if (tmpReq.Options & XrdFrcRequest::msgFail)
397 if (bln-n > 0) {Buff[n] = 'f'; n++;}
398 if (tmpReq.Options & XrdFrcRequest::msgSucc)
399 if (bln-n > 0) {Buff[n] = 'n'; n++;}
400 break;
401
403 n = strlen(tmpReq.Notify);
404 strlcpy(Buff, tmpReq.Notify, bln);
405 break;
406
408 *Buff = tmpReq.OPc;
409 n = 1;
410 break;
411
413 if (tmpReq.Prty == 2) What = '2';
414 else if (tmpReq.Prty == 1) What = '1';
415 else What = '0';
416 n = 1;
417 if (bln) *Buff = What;
418 break;
419
422 tval = tmpReq.addTOD;
423 if (ITList[i] == XrdFrcRequest::getQWT) tval = time(0)-tval;
424 if ((n = sprintf(tbuf, "%lld", tval)) >= 0)
425 strlcpy(Buff, tbuf, bln);
426 break;
427
429 n = strlen(tmpReq.ID);
430 strlcpy(Buff, tmpReq.ID, bln);
431 break;
432
434 n = strlen(tmpReq.User);
435 strlcpy(Buff, tmpReq.User, bln);
436 break;
437
438 default: n = 0; break;
439 }
440 if (bln > 0) {bln -= n; Buff += n;}
441 if (bln > 0) {*Buff++ = ' '; bln--;}
442 }
443 *Buff = '\0';
444}
445
446/******************************************************************************/
447/* F a i l A d d */
448/******************************************************************************/
449
450void XrdFrcReqFile::FailAdd(char *lfn, int unlk)
451{
452 Say.Emsg("Add", lfn, "not added to prestage queue.");
453 if (unlk) FileLock(lkNone);
454}
455
456/******************************************************************************/
457/* F a i l C a n */
458/******************************************************************************/
459
460void XrdFrcReqFile::FailCan(char *rid, int unlk)
461{
462 Say.Emsg("Can", rid, "request not removed from prestage queue.");
463 if (unlk) FileLock(lkNone);
464}
465
466/******************************************************************************/
467/* F a i l D e l */
468/******************************************************************************/
469
470void XrdFrcReqFile::FailDel(char *lfn, int unlk)
471{
472 Say.Emsg("Del", lfn, "not removed from prestage queue.");
473 if (unlk) FileLock(lkNone);
474}
475
476/******************************************************************************/
477/* F a i l I n i */
478/******************************************************************************/
479
480int XrdFrcReqFile::FailIni(const char *txt)
481{
482 Say.Emsg("Init", errno, txt, reqFN);
483 FileLock(lkNone);
484 return 0;
485}
486
487/******************************************************************************/
488/* F i l e L o c k */
489/******************************************************************************/
490
491int XrdFrcReqFile::FileLock(LockType lktype)
492{
493 FLOCK_t lock_args;
494 const char *What;
495 int rc;
496
497// Establish locking options
498//
499 memset(&lock_args, 0, sizeof(lock_args));
500 lock_args.l_whence = SEEK_SET;
501 if (lktype == lkNone)
502 {lock_args.l_type = F_UNLCK; What = "unlock";
503 if (isAgent && reqFD >= 0) {close(reqFD); reqFD = -1;}
504 }
505 else {lock_args.l_type = (lktype == lkShare ? F_RDLCK : F_WRLCK);
506 What = "lock";
507 flMutex.Lock();
508 }
509
510// Perform action.
511//
512 do {rc = fcntl(lokFD,F_SETLKW,&lock_args);}
513 while(rc < 0 && errno == EINTR);
514 if (rc < 0) {Say.Emsg("FileLock", errno, What , lokFN); return 0;}
515
516// Refresh the header
517//
518 if (lktype == lkExcl || lktype == lkShare)
519 {if (reqFD < 0 && (reqFD = XrdSysFD_Open(reqFN, O_RDWR)) < 0)
520 {Say.Emsg("FileLock",errno,"open",reqFN);
521 FileLock(lkNone);
522 return 0;
523 }
524 do {rc = pread(reqFD, (void *)&HdrData, sizeof(HdrData), 0);}
525 while(rc < 0 && errno == EINTR);
526 if (rc < 0) {Say.Emsg("reqRead",errno,"refresh hdr from", reqFN);
527 FileLock(lkNone); return 0;
528 }
529 } else if (lktype == lkNone) flMutex.UnLock();
530
531// All done
532//
533 return 1;
534}
535
536/******************************************************************************/
537/* r e q R e a d */
538/******************************************************************************/
539
540int XrdFrcReqFile::reqRead(void *Buff, int Offs)
541{
542 int rc;
543
544 do {rc = pread(reqFD, Buff, ReqSize, Offs);} while(rc < 0 && errno == EINTR);
545 if (rc < 0) {Say.Emsg("reqRead",errno,"read",reqFN); return 0;}
546 return 1;
547}
548
549/******************************************************************************/
550/* r e q W r i t e */
551/******************************************************************************/
552
553int XrdFrcReqFile::reqWrite(void *Buff, int Offs, int updthdr)
554{
555 int rc = 0;
556
557 if (Buff && Offs) do {rc = pwrite(reqFD, Buff, ReqSize, Offs);}
558 while(rc < 0 && errno == EINTR);
559 if (rc >= 0 && updthdr){do {rc = pwrite(reqFD,&HdrData, sizeof(HdrData), 0);}
560 while(rc < 0 && errno == EINTR);
561 if (rc >= 0) rc = fsync(reqFD);
562 }
563 if (rc < 0) {Say.Emsg("reqWrite",errno,"write", reqFN); return 0;}
564 return 1;
565}
566
567/******************************************************************************/
568/* R e W r i t e */
569/******************************************************************************/
570
571int XrdFrcReqFile::ReWrite(XrdFrcReqFile::recEnt *rP)
572{
573 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
574 char newFN[MAXPATHLEN], *oldFN;
575 int newFD, oldFD, Offs = ReqSize, aOK = 1;
576
577// Construct new file and open it
578//
579 strcpy(newFN, reqFN); strcat(newFN, ".new");
580 if ((newFD = XrdSysFD_Open(newFN, O_RDWR|O_CREAT|O_TRUNC, Mode)) < 0)
581 {Say.Emsg("ReWrite",errno,"open",newFN); FileLock(lkNone); return 0;}
582
583// Setup to write/swap the file
584//
585 oldFD = reqFD; reqFD = newFD;
586 oldFN = reqFN; reqFN = newFN;
587
588// Rewrite all records if we have any
589//
590 if (rP)
591 {HdrData.First = Offs;
592 while(rP)
593 {rP->reqData.This = Offs;
594 rP->reqData.Next = (rP->Next ? Offs+ReqSize : 0);
595 if (!reqWrite((void *)&rP->reqData, Offs, 0)) {aOK = 0; break;}
596 Offs += ReqSize;
597 rP = rP->Next;
598 }
599 HdrData.Last = Offs - ReqSize;
600 } else {
601 HdrData.First = HdrData.Last = 0;
602 if (ftruncate(newFD, ReqSize) < 0)
603 {Say.Emsg("ReWrite",errno,"trunc",newFN); aOK = 0;}
604 }
605
606// Update the header
607//
608 HdrData.Free = 0;
609 if (aOK && !(aOK = reqWrite(0, 0)))
610 Say.Emsg("ReWrite",errno,"write header",newFN);
611
612// If all went well, rename the file
613//
614 if (aOK && rename(newFN, oldFN) < 0)
615 {Say.Emsg("ReWrite",errno,"rename",newFN); aOK = 0;}
616
617// Perform post processing
618//
619 if (aOK) close(oldFD);
620 else {close(newFD); reqFD = oldFD;}
621 reqFN = oldFN;
622 return aOK;
623}
#define DEBUG(x)
#define EPNAME(x)
XrdOucPup XrdCmsParser::Pup & Say
#define close(a)
Definition XrdPosix.hh:48
#define fsync(a)
Definition XrdPosix.hh:64
#define fstat(a, b)
Definition XrdPosix.hh:62
#define stat(a, b)
Definition XrdPosix.hh:101
#define rename(a, b)
Definition XrdPosix.hh:92
#define ftruncate(a, b)
Definition XrdPosix.hh:70
#define pwrite(a, b, c, d)
Definition XrdPosix.hh:107
#define pread(a, b, c, d)
Definition XrdPosix.hh:80
int Mode
size_t strlcpy(char *dst, const char *src, size_t sz)
#define FLOCK_t
void Ref(const char *iName)
Definition XrdFrcCID.cc:248
int Get(XrdFrcRequest *rP)
void Del(XrdFrcRequest *rP)
char * List(char *Buff, int bsz, int &Offs, XrdFrcRequest::Item *ITList=0, int ITNum=0)
void Add(XrdFrcRequest *rP)
XrdFrcReqFile(const char *fn, int aVal)
void ListL(XrdFrcRequest &tmpReq, char *Buff, int bsz, XrdFrcRequest::Item *ITList, int ITNum)
void Can(XrdFrcRequest *rP)
char LFN[3072]
static const int msgFail
static const int makeRW
static const int msgSucc
static const int Register
signed char Prty
char Notify[512]
long long addTOD