Tek-Tips is the largest IT community on the Internet today!

Members share and learn making Tek-Tips Forums the best source of peer-reviewed technical information on the Internet!

  • Congratulations strongm on being selected by the Tek-Tips community for having the most helpful posts in the forums last week. Way to Go!

Socketing (URGENT)

Status
Not open for further replies.

kathyayini

Programmer
Aug 16, 2002
52
IN
I am using socketing and multi threading. Problem is when i fire 4 request to the server, the recv function fails for 1 request (recv function returns zero). I used pthread_cond_wait function before recv function but still getting the same problem. Client is sending all 4 requests properly, but server is not receiving one out of four request. waiting for the reply. functions are as follows :

pthread_cond_wait( &tCounter.cond, &tCounter.mutex);

nbytes = recv(socketId, filename, 101, MSG_NOSIGNAL);
 
I thing some problem with message you are sending
your buffer length is 101 bytes.
you need to give full source code ...
to debug the error
 
Below is the code which is mainly used 1) main and 2) RECEIVEBUFFER and client code.

I am facing below mentioned problems along with recv function problem:
If anything wrong in my code, please let me know

1) Error while accessing file (Access and fopen function fails when i fire 4 requests to the server)
2) how to exit that particular thread which is success or gave error
3) how to terminate Server when CTRL + C buttin is pressed
4) some time Error -12560 occures while doing DML / DDL operation.
5) some time Error -1012 occures while doing DML / DDL operation.

//*****************************************************************************
Server Code :

void *RECEIVEBUFFER(void* data)
{
struct sqlca sqlca;
int src_count;
sql_context ctx=params->ctx;

EXEC SQL BEGIN DECLARE SECTION;
char *filename [101];
int socketId = 0;
char cRefNo [15];
char cInput [4000];
char tstr [10];
int condition = 0;
EXEC SQL END DECLARE SECTION;

EXEC SQL CONTEXT USE :ctx;
EXEC SQL COMMIT;

pthread_cond_init(&tCounter.cond,tCounter.attr);

maptable inputMap;
maptable outputMap;
maptable::iterator pMapIter;


while( condition < 10 )
{
pthread_mutex_lock(&(tCounter.mutex));
while( condition == 10 )
pthread_cond_wait( &tCounter.cond, &tCounter.mutex);
condition ++;
pthread_cond_signal( &tCounter.cond );
pthread_mutex_unlock(&(tCounter.mutex));
}

socketId = ((struct sdata*) (data))->socket_id;
int nbytes;
memset(filename,0,101);
nbytes = recv(socketId, filename, 101, MSG_NOSIGNAL);

int rc, i,rv;
char sendBuffer [10000];
char msg [2001];
FILE *fp;
char itransNo [15];

if (access((char*)filename, F_OK) != 0)
{
printf(&quot;Error while accessing file\n&quot;);
pthread_exit((void *)-3);
}

fp = fopen((char*)filename, &quot;r&quot;);
if (fp == (FILE *)NULL) pthread_exit((void *)-2);

pthread_mutex_lock(&(tCounter.mutex));
strcpy(msg,&quot;&quot;);
pthread_mutex_unlock(&(tCounter.mutex));

memset(msg, '\0', sizeof(msg));
fgets(msg, sizeof(msg)-1, fp);

fclose (fp);

pthread_mutex_lock(&(tCounter.mutex));
strcpy(cInput,&quot;&quot;);
inputMap.erase(inputMap.begin(), inputMap.end());
outputMap.erase(outputMap.begin(), outputMap.end());
inputMap.clear();
outputMap.clear();
pthread_mutex_unlock(&(tCounter.mutex));

rc = DecodeISO8583(msg,inputMap,pMapIter);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }

pthread_mutex_lock(&(tCounter.mutex));
printf(&quot;Message Length = %d\n&quot;, rc);
printf(&quot;Version = %d\n&quot;, iVersion);
printf(&quot;Message Type = %d\n&quot;, iInMsgType);
printf(&quot;Msg = &quot;);
for (i=0; i<rc; i++) if(msg < ' ') printf(&quot;.&quot;); else printf(&quot;%c&quot;, msg);
printf(&quot;\n&quot;);
for (pMapIter = inputMap.begin(); pMapIter != inputMap.end(); pMapIter++)
printf(&quot;Field %03d, Value %s\n&quot;, pMapIter->first, pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));

switch (iInMsgType)
{
pthread_mutex_lock(&(tCounter.mutex));
case 200 : iOutMsgType = 210; break;
case 220 :
case 221 : iOutMsgType = 230; break;
case 420 :
case 421 : iOutMsgType = 430; break;
case 604 : iOutMsgType = 614; break;
case 624 :
case 625 : iOutMsgType = 634; break;
case 804 : iOutMsgType = 814;
if (iFunctionCode == 801 || iFunctionCode == 802 || iFunctionCode == 831 || iFunctionCode == 880)
strcpy(tstr, &quot;800&quot;);
else strcpy(tstr, &quot;902&quot;);
outputMap.insert(maptable::value_type(39, tstr));
break;
default : pthread_exit((void *)-4);
pthread_mutex_unlock(&(tCounter.mutex));
}

rc = CopyFields(inputMap,outputMap,pMapIter);
if (rc < 0)
{
DisConnectDB();
pthread_exit((void *)rc);
}
if (iInMsgType != 804)
{

pthread_mutex_lock(&(tCounter.mutex));
pMapIter = inputMap.find(37);
strcpy(cRefNo, (char *)pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));

pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL SELECT INPUT INTO :cInput FROM LOG_TRANSACTION WHERE REF_NO = :cRefNo AND STATUS in('S','D');
pthread_mutex_unlock(&(tCounter.mutex));

if(sqlca.sqlerrd[2] == 0)
{
rc=InsertInTable(inputMap,pMapIter,cRefNo);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }
}
else if (sqlca.sqlcode < 0)
{
printf(&quot;Error %d while fetching record***\n&quot;,sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}

pthread_mutex_lock(&(tCounter.mutex));
strcpy(cInput,&quot;&quot;);
EXEC SQL SELECT OUTPUT INTO :cInput FROM LOG_TRANSACTION WHERE REF_NO= :cRefNo AND STATUS = 'D';
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlerrd[2] > 0)
{
rc=GenerateOutputMap(cInput,outputMap,pMapIter);
if (rc < 0) { DisConnectDB(); pthread_exit((void *)rc); }
else
{
pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL UPDATE LOG_TRANSACTION SET STATUS='N' WHERE REF_NO=:cRefNo AND STATUS = 'D';
EXEC SQL COMMIT;
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlcode < 0)
{
printf(&quot;Error %d while updating record in LOG_TRANSACTION table@.\n&quot;, sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
else
{
pthread_exit((void *)rc);
}
}
pthread_exit((void *)rc);
}
else if (sqlca.sqlcode < 0)
{
printf(&quot;Error %d while ***fetching record.\n&quot;,sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}

rc = initTuxedo();
if (rc != 0) { DisConnectDB(); pthread_exit((void *)rc); }

rc = authenticateServer(0);
if (rc != 0) { tpterm(); DisConnectDB(); pthread_exit((void *)rc); }

pthread_mutex_lock(&(tCounter.mutex));
strcpy((char *)vcSourceProduct.arr, &quot;SWIT&quot;);
vcSourceProduct.len = strlen((char *)vcSourceProduct.arr);
pthread_mutex_unlock(&(tCounter.mutex));

memset(sendBuffer, ' ', MLHEADER_LEN);
sendBuffer[MLHEADER_LEN] = '\0';
strcat(sendBuffer, &quot;00000&quot;);
rc = MapTransactionData(sendBuffer,inputMap,pMapIter,cRefNo);
if (rc < 0)
{
tpterm();
DisConnectDB();
pthread_exit((void*)rc);
}

rc = CallGenericService(sendBuffer);
if ((rc == -5) || (rc == -6))
{
tpterm();
DisConnectDB();
pthread_exit((void*)rc);
}
else
{
pthread_mutex_lock(&(tCounter.mutex));
EXEC SQL
UPDATE LOG_TRANSACTION SET STATUS = 'D' WHERE REF_NO=:cRefNo AND STATUS = 'S';
EXEC SQL COMMIT;
pthread_mutex_unlock(&(tCounter.mutex));
if (sqlca.sqlcode < 0)
{
printf(&quot;Error %d while updating LOG_TRANSACTION table&quot;,sqlca.sqlcode);
pthread_exit((void *)sqlca.sqlcode);
}
}
if (rc == 0)
{
rv=SetFields(sendBuffer,itransNo,outputMap,pMapIter);
if (rv < 0)
{
DisConnectDB();
pthread_exit((void*)rv);
}
else
{
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0)
{
DisConnectDB();
pthread_exit((void*)rv);
}
}
}
else if(rc == -1)
{
if(tpurcode == 0)
{
outputMap.insert(maptable::value_type(39,&quot;912&quot;));
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0) { DisConnectDB(); pthread_exit((void *)rv); }
}
else if(tpurcode == -1)
{
rv=SetFields(sendBuffer,itransNo,outputMap,pMapIter);
if (rv < 0)
{
DisConnectDB(); pthread_exit((void *)rv);
}
else
{
rv= UpdateTable(itransNo,outputMap,pMapIter,cRefNo);
if (rv < 0) { DisConnectDB(); pthread_exit((void *)rv); }
}
}
}
}
rc = EncodeISO8583(msg,outputMap,pMapIter);
if (iInMsgType != 804) tpterm();
DisConnectDB();

pthread_mutex_lock(&(tCounter.mutex));
printf(&quot;Message Length = %d\n&quot;, rc);
printf(&quot;Version = %d\n&quot;, iVersion);
printf(&quot;Message Type = %d\n&quot;, iOutMsgType);
printf(&quot;Msg = &quot;);
for (i=0; i<rc; i++) if(msg < ' ') printf(&quot;.&quot;); else printf(&quot;%c&quot;, msg);
printf(&quot;\n&quot;);
for (pMapIter = outputMap.begin(); pMapIter != outputMap.end(); pMapIter++)
printf(&quot;Field %03d, Value %s\n&quot;, pMapIter->first, pMapIter->second.c_str());
pthread_mutex_unlock(&(tCounter.mutex));

pthread_exit((void*)rc);
return (NULL);
}

int main()
{
int rc, i,rv;
int serverSocket;
int addrLen;
struct sockaddr_in sockAddr;
int threadCount=0;
int max_concurrent_alwd=0;
int nextIndex;
pthread_t receiveBufferThread[MAX_THREADS],cur;
pthread_attr_t attr;
size_t stacksize;
int segment_size=0;
int optval;
int total;
int iCtxCount=1;

EXEC SQL BEGIN DECLARE SECTION;
sql_context ctx[THREADS];
int sock_id = 0;
EXEC SQL END DECLARE SECTION;
pthread_t thread_id[THREADS];

pthread_attr_t patr;
EXEC SQL ENABLE THREADS;
pthread_mutex_init(&tCounter.mutex,NULL);


pthread_attr_init(&attr);
stacksize = 500000;
pthread_attr_setstacksize (&attr, stacksize);

sockAddr.sin_family=AF_INET;
sockAddr.sin_addr.s_addr=INADDR_ANY;
sockAddr.sin_port=htons(PORT_NO);
addrLen=sizeof(struct sockaddr_in);

if((serverSocket=socket(AF_INET,SOCK_STREAM,0))== -1)
{
printf(&quot;Failed to create Socket\n&quot;);
return -1;
}
else
{
printf(&quot;Socket created successfully\n&quot;);
}

optval = 1;
rc = setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR,(char *)&optval, sizeof(optval));

if(bind(serverSocket,(struct sockaddr_in*)&sockAddr,sizeof(sockAddr))==-1)
{
printf(&quot;Failed to bind at port %d\n&quot;,PORT_NO);
return -1;
}
else
{
printf(&quot;Binding at port %d done successfully\n&quot;,PORT_NO);
}
tCounter.threadCount=0;

if(listen(serverSocket,1000) == -1)
{
printf(&quot;Failed to listen at port %d\n&quot;,PORT_NO);
return -1;
}
else
{
printf(&quot;Listening at port %d\n&quot;,PORT_NO);
}

while(1)
{
sock_id = accept(serverSocket,(struct sockaddr_in*)&sockAddr,&addrLen);

pthread_mutex_lock(&(tCounter.mutex));
threadCount = tCounter.threadCount;
pthread_mutex_unlock(&(tCounter.mutex));
if(sock_id == -1)
{
if(errno == EINTR)
{
printf(&quot;Signal Recived coming out \n&quot;);
break;
}
else
{
continue;
}
}
nextIndex = getFree();

newSocket[nextIndex].socket_id = sock_id;
newSocket[nextIndex].segment_size=segment_size;


for(i=0;i<1;i++)
{
EXEC SQL CONTEXT ALLOCATE :ctx;
EXEC SQL CONTEXT USE :ctx;
EXEC SQL CONNECT :CONNINFO;
printf(&quot;Connected!\n&quot;);

}
for(i=0;i<1;i++)
{
params.ctx=ctx;
params.thread_id=i;

pthread_create(&cur, NULL, RECEIVEBUFFER,(void*)&newSocket[nextIndex]);
send(sock_id,&quot;Success@@@&quot;,10,MSG_NOSIGNAL);
}
iCtxCount++;
pthread_detach(cur);

pthread_mutex_lock(&(tCounter.mutex));
tCounter.threadCount++;
pthread_mutex_unlock(&(tCounter.mutex));
}

pthread_mutex_lock(&(tCounter.mutex));
threadCount = tCounter.threadCount;
pthread_mutex_unlock(&(tCounter.mutex));
for(i=0;i<=threadCount;i++)
{
pthread_join(cur,NULL);
}

TP_TERMINATE(serverSocket);

DisConnectDB();
return 0;
}

//*****************************************************************************

Client code :

#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>

#define MESSAGE_LENGTH 101
#define PORT_NO 8891
#define BACKLOG 5
#define SERVER_IP &quot;127.0.0.1&quot;
#define NO_OF_RETRY 5
#define CHUNK_LENGTH 10

int TP_INIT(void);
int TP_CALL(char*,int,int);
int TP_TERMINATE(int);

int TP_INIT(void)
{
int newSocket,retryCount;
struct sockaddr_in sockAddr;

if((newSocket=socket(AF_INET,SOCK_STREAM,0))>0)
//printf(&quot;\nSocket created...\n&quot;);
sockAddr.sin_family=AF_INET;
sockAddr.sin_port=htons(PORT_NO);
inet_pton(AF_INET,SERVER_IP,&sockAddr.sin_addr);

retryCount=0;
while(retryCount<NO_OF_RETRY)
{
if(connect(newSocket,(struct sockaddr_in*)&sockAddr,sizeof(sockAddr))==0)
{
//printf(&quot;\nConnection established with the server %s...\n&quot;,inet_ntoa(sockAddr.sin_addr));
break;
}
retryCount++;
}
return newSocket;
}
int TP_CALL(char *buffer,int bufferLength,int socketId)
{

char packetLength[5];
int bytesSent;

printf(&quot;\nRequest is :=%s Buffer length= %d\n&quot;,buffer,bufferLength);
sprintf(packetLength,&quot;%ld&quot;,bufferLength);
//send(socketId,packetLength,sizeof(long),0);

bytesSent=send(socketId,buffer,bufferLength,0);
printf(&quot;Send data size is :%d\n&quot;,bytesSent);

recv(socketId,buffer,MESSAGE_LENGTH,0);
printf(&quot;\nAcknowledgement message= %s\n&quot;,buffer);

return bytesSent;
}

int TP_TERMINATE(int socketId)
{
return close(socketId);
}

int main(int argc,char * argv[])
{
for(int z=0;z<2;z++)
{
int socket,i;
char buffer[MESSAGE_LENGTH],bufLength=0;

socket=TP_INIT();
memset(buffer,0,MESSAGE_LENGTH-1);

if(strcmp(argv[1] , &quot;a&quot;) == 0)
{
if(z==0)
strcpy(buffer,&quot;CWD_RQST_FROM_ELECTRA&quot;);
//strcpy(buffer,&quot;qqqqqq&quot;);
if(z==1)
strcpy(buffer,&quot;ECHOTEST_RQST_FROM_ELECTRA&quot;);
}
if(strcmp(argv[1] , &quot;b&quot;) == 0)
{
if(z==0)
strcpy(buffer,&quot;ECHOTEST_RQST_FROM_ELECTRA1&quot;);
if(z==1)
strcpy(buffer,&quot;MINISTMT_RQST_FROM_ELECTRA&quot;);
}
sleep(1);
TP_CALL(buffer,strlen(buffer),socket);
TP_TERMINATE(socket);
}
return 0;
}


 
Above mentioned progran runs fine thirteen time but 14th time fopen function in RECEIVEBUFFER fails. waiting for the reply.
 
Status
Not open for further replies.

Part and Inventory Search

Sponsor

Back
Top