60 opcode( None ), hosts( new
XrdCl::
HostList( hostList ) ), handler( handler )
63 memset( ptr, 0,
sizeof(
aiocb ) );
67 env->
GetInt(
"AioSignal", useSignals );
71 static SignalHandlerRegistrator registrator;
88 void SetWrite(
int fd,
size_t offset,
size_t size,
const void *buffer )
91 cb->aio_offset = offset;
92 cb->aio_buf =
const_cast<void*
>( buffer );
93 cb->aio_nbytes = size;
94 opcode = Opcode::Write;
97 void SetRead(
int fd,
size_t offset,
size_t size,
void *buffer )
100 cb->aio_offset = offset;
101 cb->aio_buf = buffer;
102 cb->aio_nbytes = size;
103 opcode = Opcode::Read;
106 void SetFsync(
int fd )
109 opcode = Opcode::Sync;
112 static void ThreadHandler( sigval arg )
114 std::unique_ptr<AioCtx> me(
reinterpret_cast<AioCtx*
>( arg.sival_ptr ) );
115 Handler( std::move( me ) );
118 static void SignalHandler(
int sig, siginfo_t *info,
void *ucontext )
120 std::unique_ptr<AioCtx> me(
reinterpret_cast<AioCtx*
>( info->si_value.sival_ptr ) );
121 Handler( std::move( me ) );
131 struct SignalHandlerRegistrator
133 SignalHandlerRegistrator()
135 struct sigaction newact, oldact;
136 newact.sa_sigaction = SignalHandler;
137 sigemptyset( &newact.sa_mask );
138 newact.sa_flags = SA_SIGINFO;
139 int rc = sigaction( SIGUSR1, &newact, &oldact );
141 throw std::runtime_error(
XrdSysE2T( errno ) );
145 static void Handler( std::unique_ptr<AioCtx> me )
147 if( me->opcode == Opcode::None )
150 using namespace XrdCl;
152 int rc = aio_return( me->cb.get() );
155 int errcode = aio_error( me->cb.get() );
156 Log *log = DefaultEnv::GetLog();
157 log->
Error( FileMsg, GetErrMsg( me->opcode ),
XrdSysE2T( errcode ) );
159 QueueTask( error, 0, me->hosts, me->handler );
165 if( me->opcode == Opcode::Read )
168 const_cast<void*
>( me->cb->aio_buf ) );
173 QueueTask(
new XRootDStatus(), resp, me->hosts, me->handler );
177 static const char* GetErrMsg( Opcode opcode )
179 static const char readmsg[] =
"Read: failed %s";
180 static const char writemsg[] =
"Write: failed %s";
181 static const char syncmsg[] =
"Sync: failed %s";
185 case Opcode::Read:
return readmsg;
187 case Opcode::Write:
return writemsg;
189 case Opcode::Sync:
return syncmsg;
198 using namespace XrdCl;
203 syncHandler->HandleResponse( status, resp );
204 }
else if(
auto postmaster = DefaultEnv::GetPostMaster()) {
205 if (
JobManager *jmngr = postmaster->GetJobManager()) {
207 jmngr->QueueJob( task );
212 std::unique_ptr<aiocb> cb;
257 uint16_t flags = ntohs( request->
options );
258 uint16_t mode = ntohs( request->
mode );
268 if(
close( fd ) == -1 )
288 if(
fstat( fd, &ssp ) == -1 )
294 std::ostringstream data;
295 data << ssp.st_dev <<
" " << ssp.st_size <<
" " << ssp.st_mode <<
" "
302 log->
Error(
FileMsg,
"Stat: ParseServerResponse failed." );
309 resp->
Set( statInfo );
319#if defined(__APPLE__)
322 if( (
read =
pread( fd, buffer, size, offset ) ) == -1 )
333 AioCtx *ctx =
new AioCtx( pHostList, handler );
334 ctx->SetRead( fd, offset, size, buffer );
336 int rc = aio_read( *ctx );
360#if defined(__APPLE__)
361 ssize_t ret =
lseek( fd, offset, SEEK_SET );
365 ssize_t ret = preadv( fd,
iov, iovcnt, offset );
375 uint64_t choff = offset;
377 for(
int i = 0; i < iovcnt; ++i )
379 uint32_t chlen =
iov[i].iov_len;
380 if( chlen > left ) chlen = left;
381 info->
GetChunks().emplace_back( choff, chlen,
iov[i].iov_base);
396#if defined(__APPLE__)
397 const char *buff =
reinterpret_cast<const char*
>( buffer );
398 size_t bytesWritten = 0;
399 while( bytesWritten < size )
401 ssize_t ret =
pwrite( fd, buff, size, offset );
415 AioCtx *ctx =
new AioCtx( pHostList, handler );
416 ctx->SetWrite( fd, offset, size, buffer );
418 int rc = aio_write( *ctx );
437#if defined(__APPLE__)
449 AioCtx *ctx =
new AioCtx( pHostList, handler );
451 int rc = aio_fsync( O_SYNC, *ctx );
471 log->
Error(
FileMsg,
"Truncate: failed, file descriptor: %i, %s", fd,
487 size_t totalSize = 0;
488 bool useBuffer( buffer );
490 for(
auto itr = chunks.begin(); itr != chunks.end(); ++itr )
494 buffer = chunk.buffer;
495 ssize_t bytesRead =
pread( fd, buffer, chunk.length,
500 log->
Error(
FileMsg,
"VectorRead: failed, file descriptor: %i, %s",
505 totalSize += bytesRead;
506 info->GetChunks().push_back(
ChunkInfo( chunk.offset, bytesRead, buffer ) );
508 buffer =
reinterpret_cast<char*
>( buffer ) + bytesRead;
511 info->SetSize( totalSize );
513 resp->
Set( info.release() );
524 for(
auto itr = chunks.begin(); itr != chunks.end(); ++itr )
527 ssize_t bytesWritten =
pwrite( fd, chunk.buffer, chunk.length,
529 if( bytesWritten < 0 )
532 log->
Error(
FileMsg,
"VectorWrite: failed, file descriptor: %i, %s",
550 size_t iovcnt = chunks->size();
553 for(
size_t i = 0; i < iovcnt; ++i )
555 iovcp[i].iov_base = (*chunks)[i].buffer;
556 iovcp[i].iov_len = (*chunks)[i].length;
557 size += (*chunks)[i].length;
559 iovec *iovptr = iovcp;
561 ssize_t bytesWritten = 0;
562 while( bytesWritten < size )
565 ssize_t ret =
lseek( fd, offset, SEEK_SET );
567 ret =
writev( fd, iovptr, iovcnt );
569 ssize_t ret = pwritev( fd, iovptr, iovcnt, offset );
582 if(
size_t( ret ) > iovptr[0].iov_len )
584 ret -= iovptr[0].iov_len;
590 iovptr[0].iov_len -= ret;
591 iovptr[0].iov_base =
reinterpret_cast<char*
>( iovptr[0].iov_base ) + ret;
626 std::vector<XAttrStatus> response;
628 auto itr = attrs.begin();
629 for( ; itr != attrs.end(); ++itr )
631 std::string name = std::get<xattr_name>( *itr );
632 std::string value = std::get<xattr_value>( *itr );
633 int err = xattr->
Set( name.c_str(), value.c_str(), value.size(), 0, fd );
641 resp->
Set(
new std::vector<XAttrStatus>( std::move( response ) ) );
654 std::vector<XAttr> response;
656 auto itr = attrs.begin();
657 for( ; itr != attrs.end(); ++itr )
659 std::string name = *itr;
660 std::unique_ptr<char[]> buffer;
662 int size = xattr->
Get( name.c_str(), 0, 0, 0, fd );
666 response.push_back(
XAttr( *itr,
"", status ) );
669 buffer.reset(
new char[size] );
670 int ret = xattr->
Get( name.c_str(), buffer.get(), size, 0, fd );
676 value.append( buffer.get(), ret );
680 response.push_back(
XAttr( *itr, value, status ) );
684 resp->
Set(
new std::vector<XAttr>( std::move( response ) ) );
697 std::vector<XAttrStatus> response;
699 auto itr = attrs.begin();
700 for( ; itr != attrs.end(); ++itr )
702 std::string name = *itr;
703 int err = xattr->
Del( name.c_str(), 0, fd );
711 resp->
Set(
new std::vector<XAttrStatus>( std::move( response ) ) );
723 std::vector<XAttr> response;
726 int err = xattr->
List( &alist, 0, fd, 1 );
737 std::string name( ptr->
Name, ptr->
Nlen );
738 int vlen = ptr->
Vlen;
741 std::unique_ptr<char[]> buffer(
new char[vlen] );
742 int ret = xattr->
Get( name.c_str(),
743 buffer.get(), vlen, 0, fd );
745 std::string value = ret >= 0 ? std::string( buffer.get(), ret ) :
749 response.push_back(
XAttr( name, value, status ) );
751 xattr->
Free( alist );
754 resp->
Set(
new std::vector<XAttr>( std::move( response ) ) );
770 syncHandler->HandleResponse( st, resp );
777 postmaster->GetJobManager()->QueueJob( task );
789 size_t pos = path.rfind(
'/' );
790 while( pos != std::string::npos && pos != 0 )
792 std::string tmp = path.substr( 0, pos );
794 int rc = lstat( tmp.c_str(), &st );
796 if( errno != ENOENT )
798 pos = path.rfind(
'/', pos - 1 );
801 pos = path.find(
'/', pos + 1 );
802 while( pos != std::string::npos && pos != 0 )
804 std::string tmp = path.substr( 0, pos );
805 if(
mkdir( tmp.c_str(), 0755 ) )
807 if( errno != EEXIST )
810 pos = path.find(
'/', pos + 1 );
815 XRootDStatus LocalFileHandler::OpenImpl(
const std::string &url, uint16_t flags,
824 if( !fileUrl.IsValid() )
827 if( fileUrl.GetHostName() !=
"localhost" )
830 std::string path = fileUrl.GetPath();
835 uint16_t openflags = 0;
837 openflags |= O_CREAT | O_EXCL;
839 openflags |= O_WRONLY;
843 openflags |= O_RDONLY;
845 openflags |= O_CREAT | O_TRUNC;
852 log->
Error(
FileMsg,
"Open MkdirPath failed %s: %s", path.c_str(),
863 fd = XrdSysFD_Open( path.c_str(), openflags, mode );
866 log->
Error(
FileMsg,
"Open: open failed: %s: %s", path.c_str(),
876 if(
fstat( fd, &ssp ) == -1 )
883 std::ostringstream data;
884 data << ssp.st_dev <<
" " << ssp.st_size <<
" " << ssp.st_mode <<
" "
890 log->
Error(
FileMsg,
"Open: ParseServerResponse failed." );
896 pHostList.push_back(
HostInfo( pUrl,
false ) );
902 resp->
Set( openInfo );
927 std::vector<std::string> attrs;
929 for(
kXR_char i = 0; i < numattr; ++i )
936 size_t len = strlen( body );
938 attrs.push_back( std::string( body, len ) );
951 std::vector<xattr_t> attrs;
953 for(
kXR_char i = 0; i < numattr; ++i )
962 attrs.push_back( std::make_tuple( std::string( name ), std::string() ) );
963 bodylen -= strlen( name ) + 1;
967 for(
kXR_char i = 0; i < numattr; ++i )
979 std::get<xattr_value>( attrs[i] ) = value;
1029 auto &chunkList = *sendParams.
chunkList;
1030 struct iovec
iov[chunkList.size()];
1031 for(
size_t i = 0; i < chunkList.size() ; ++i )
1033 iov[i].iov_base = chunkList[i].buffer;
1034 iov[i].iov_len = chunkList[i].length;
1036 return ReadV( chunkList.front().offset,
iov, chunkList.
size(),
1037 handler, sendParams.
timeout );
1042 handler, sendParams.
timeout );
1048 if( chunks->size() == 1 )
1052 chunks->front().buffer, handler,
1057 handler, sendParams.
timeout );
1079 handler, sendParams.
timeout );
struct ClientTruncateRequest truncate
struct ClientFattrRequest fattr
struct ClientOpenRequest open
struct ClientRequestHdr header
struct ClientReadRequest read
struct ClientWriteRequest write
#define pwrite(a, b, c, d)
#define pread(a, b, c, d)
struct sigevent aio_sigevent
const char * XrdSysE2T(int errcode)
static int mapError(int rc)
void Set(Type object, bool own=true)
Binary blob representation.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
XRootDStatus Truncate(uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorRead(const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Stat(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ReadV(uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus QueueTask(XRootDStatus *st, AnyObject *obj, ResponseHandler *handler)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus MkdirPath(const std::string &path)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus WriteV(uint64_t offset, ChunkList *chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus DelXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Visa(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorWrite(const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus GetXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
void Error(uint64_t topic, const char *format,...)
Report an error.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
uint16_t GetVirtReqID() const
Get virtual request ID for the message.
Information returned by file open operation.
Handle an async response.
bool ParseServerResponse(const char *data)
Parse server response and fill up the object.
Synchronize the response.
std::string GetURL() const
Get the URL.
ChunkList & GetChunks()
Get chunks.
void SetSize(uint32_t size)
Set size.
char Name[1]
Start of the name (size of struct is dynamic)
int Vlen
The length of the attribute value;.
virtual int List(AList **aPL, const char *Path, int fd=-1, int getSz=0)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
int Nlen
The length of the attribute name that follows.
virtual void Free(AList *aPL)=0
virtual int Del(const char *Aname, const char *Path, int fd=-1)=0
AList * Next
-> next element.
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t stOK
Everything went OK.
const uint16_t errOSError
const uint16_t errInvalidArgs
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errNotSupported
const uint16_t errLocalError
const int DefaultAioSignal
static char * NVecRead(char *buffer, kXR_unt16 &rc)
static char * VVecRead(char *buffer, kXR_int32 &len)
Describe a data chunk for vector read.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
uint32_t errNo
Errno, if any.
Extended attribute operation status.
Extended attributes with status.