www.pudn.com > yidong.rar > TestSend.java


package sample;
import aiismg.jcmppapi30.CMPPAPI;
import aiismg.jcmppapi30.CMPPDeliverResp;
import java.io.*;
import java.net.*;
import java.util.Date; 
import java.util.*; 

public class TestSend {
	public static void main( String argv[] ) throws IOException
	{
   	long dd = 12;
   	int bc = 0;
   	String input;
   	LogToFile logToFile = new LogToFile("TestSend");
		Queue myQueue = new Queue();
		CMPPAPI herepCMPPAPI=new CMPPAPI();

		if( herepCMPPAPI.InitCMPPAPI( "../config/javacmppc.ini" ) != 0 ) 
		{
			//System.out.println( "Fail to call InitCMPPAPI!" );
			//System.exit( 1 );
			logToFile.DoLog("Fail to call InitCMPPAPI!");
		}
		else
		{
			logToFile.DoLog("Success to call InitCMPPAPI!");
		}


		System.out.println("我准备向队列里面写入东西");

		/*
		myQueue.enq("QA");
		myQueue.enq("QB");
		myQueue.enq("QC");
		myQueue.enq("QD");
		myQueue.enq("QE");
		myQueue.enq("QF");
		myQueue.enq("QG");
		myQueue.enq("QGG");
		myQueue.enq("QH");
		myQueue.enq("QI");
		myQueue.enq("QJ");
		myQueue.enq("QK");
		myQueue.enq("QL");
		myQueue.enq("QM");
		myQueue.enq("QN");
		myQueue.enq("QO");
		myQueue.enq("QP");
		myQueue.enq("QQ");
		myQueue.enq("QR");
		*/
	
		while (bc<500)
		{
			myQueue.enq("QA");
			myQueue.enq("QB");
			myQueue.enq("QC");
			myQueue.enq("QD");
			myQueue.enq("QE");
			myQueue.enq("QF");
			myQueue.enq("QG");
			myQueue.enq("QGG");
			myQueue.enq("QH");
			myQueue.enq("QI");
			myQueue.enq("QJ");
			myQueue.enq("QK");
			myQueue.enq("QL");
			myQueue.enq("QM");
			myQueue.enq("QN");
			myQueue.enq("QO");
			myQueue.enq("QP");
			myQueue.enq("QQ");
			myQueue.enq("QR");
			myQueue.enq("QS");
			myQueue.enq("QT");
			myQueue.enq("QU");
			myQueue.enq("QV");
			myQueue.enq("end");
			bc++;
		}

		System.out.println("主进程队列处理完毕,队列长度是"+myQueue.GetLength());
		DaemonThread daemonSendThreadA = new DaemonThread(myQueue);
		daemonSendThreadA.setDaemon(true);
		daemonSendThreadA.start();
		System.out.println("主进程队列处理完毕等待守护进程工作");

		try
		{
			daemonSendThreadA.join();
		}
		catch ( InterruptedException ex)
		{
		}
		/*
		while(daemonSendThreadA.isAlive() && bc<1500000)
	  {
			if ( bc % 1000 == 0)
	   		System.out.println("主进程等待守护进程中,第"+bc+"次");	
			bc = bc+1;
	   	for (int aaa=0;aaa<20000;aaa++)
	  	{
	   		dd = 25*25*bc;
	   	}
	  }
		*/
	  System.out.println("主进程等待守护进程结束,它"+daemonSendThreadA.isAlive()+"活着");	
	  System.exit(0);
	}//end of main  //LogToFile logToFile = new LogToFile("TestReceive");

}//end of class TestReceive


/*
here is the Queue Class code
*/
class Queue {
	static Vector hereVector;
	static int hereLength;

	public Queue() 
	{ 

		if (hereVector==null)
		{
			hereVector= new Vector();
			hereLength = 0;
		}
	}
	public static  synchronized void enq(Object x) { 
		//super.addElement(x); 
		hereVector.add(x);
		hereLength++;
	} 
	
	public static synchronized Object deq() { 
		/* 队列若为空,引发EmptyQueueException异常 */ 
		if( empty() ) 
			{return null;}

		Object x = hereVector.elementAt(0); 
		hereVector.removeElementAt(0); 
		hereLength--;
		return x; 
	} 
	
	public static  synchronized Object front() { 
		if( empty() ) 
			{return null;}
		return hereVector.elementAt(0); 
	} 
	public static  boolean empty() { 
		return hereVector.isEmpty(); 
	} 
	public static  synchronized void clear() { 
		hereVector.removeAllElements(); 
	} 
	
	public static  int search(Object x) { 
		return hereVector.indexOf(x); 
	}
	public static  synchronized int GetLength() { 
		return hereLength;
	} 
} 

class LogToFile{
	String logStr = new String();
	PrintWriter log;
	String logFile = new String();
	String extFile = new String();
	int countNum=0;
	final int WarningNum=200000;
	int a=1;
	
	public LogToFile(String inLogFile){
		GregorianCalendar lpMyCalender = new GregorianCalendar( );
		logFile = "./"+inLogFile+lpMyCalender.get(lpMyCalender.YEAR)+(1 + lpMyCalender.get(lpMyCalender.MONTH))+lpMyCalender.get(lpMyCalender.DATE);
		//logFile = "./"+inLogFile+(1900+new Date().getYear())+"-"+(1+new Date().getMonth())+"-"+new Date().getDate();
		extFile = ".log";
		try { 
			log = new PrintWriter(new FileWriter(logFile+extFile, true), true); 
		} 
		catch (IOException e) { 
			System.err.println("无法打开日志文件: " + logFile); 
		} 
	}
	
	public synchronized void DoLog(String inStr)
	{
		log.println(new Date() + " % " + inStr.replace('\n','N').replace('\r','R') + " %end% "); 
		countNum++;
		if (countNum>WarningNum)
		{
			try 
			{ 
				log = new PrintWriter(new FileWriter(logFile+"_"+a+extFile, true), true); 
				countNum=0;
				a++;
			} 
			catch (IOException e) 
			{ 
				System.err.println("无法打开日志文件: " + logFile); 
			}
		}
	}
}

/*
here is the monitor resource control code
*/
class ResControlCenter{
	static int readyNum=0;
	static int runningNum=0;
	static int AllNum=0;
	static int DeadNum=0;
	static int CouldRunning=4;//初始设置。默认4,最大99
	static int MaxNum=9;//促使设置。默认9,最大99
	String[] nameAry;
	int[] status;
	
	public ResControlCenter(int inMaxNum,int inCouldRunNum)
	{
		if (inMaxNum>1)
		{
			MaxNum = inMaxNum;
		}
		if (inCouldRunNum>1)
		{
			CouldRunning =	inCouldRunNum;
		}

		nameAry = new String[(MaxNum+1)];
		status = new int[(MaxNum+1)];

		for (int t=0;t1)
		{
			CouldRunning = CouldRunning-i;
		}
	}
	synchronized public int GetAllNum(){
		return AllNum;
	}
	synchronized public int GetReadyNum(){
		return readyNum;
	}
	synchronized public int GetRunningNum(){
		return runningNum;
	}
	synchronized public int GetMaxNum(){
		return MaxNum;
	}
	synchronized public String GetThreadName(int i){
		return nameAry[i];
	}
	synchronized public boolean ApplyNewThread(){
		boolean ret = false;
		if (AllNum=CouldRunning){
			try{
			wait();
			}catch(Exception ea)
			{
			System.out.println("异常.");
	
			}
		}
		runningNum = runningNum+1;
		readyNum = readyNum-1;
		status[i] = 2;//写成Running状态
	}
	synchronized public int GetDeadNum(){
		int ret = -1;
		if (DeadNum>0)
		{
			for (int j=0;j140)
						{
							msgLength=140;
						}

						byte[] sMsgCon = new byte[msgLength];
						System.arraycopy( msg.getBytes(), 0, sMsgCon, 0, msgLength );
						byte[] ba = new byte[17];

						hereLogFile.DoLog( "Henry log: ReadQueueThreada before SendSingle, myNum:"+myNum+", thread "+hereName +" start working, sending "+msg );
						System.out.println( "Henry log: ReadQueueThreada before SendSingle, myNum:"+myNum+", thread "+hereName +" start working, sending "+msg );

						if ( myNum % 3 != 0 )
						{
							if( herepCMPPAPI.CMPPSendSingle( needReply, msgLevel,sServiceID, 
										msgFormat,sFeeType, sFeeCode,ba, ba,sSrcTermID, sDestTermID,
										msgLength, sMsgCon,sMsgID, (byte)0, null, (byte)0, (byte)0,(byte)0,(byte)0,null ) != 0 ) 
							{
								System.out.println( "ReadQueueThreada: myNum="+ myNum +
										". Fail to call CMPPSendSingle, error=" + herepCMPPAPI.GetErrCode() );
								hereReceiveLog.DoLog("线程"+hereName+"发送"+msg+"失败");
								//System.exit( 1 );
							}
							else
							{
								//System.out.println("Send is OVER!");
								hereLogFile.DoLog(";   "+countnum+"    ;守护线程发送"+msg+"成功");
							}//end of if for send sms
						}
						else
						{
							if( herepCMPPAPI.CMPPDeliver( 5, lpDeliverResp ) != 0 )
							{
								System.out.println( "ReadQueueThreada: myNum="+ myNum +
										". Fail to call CMPPDeliver, error=" + herepCMPPAPI.GetErrCode() );
								hereReceiveLog.DoLog("Thread "+hereName+"receive failed");
							}
							else
							{
								hereLogFile.DoLog(";   "+countnum+"    ;daemon thread succeed to receive msg.");
							}
						}

       			hereLogFile.DoLog("线程"+hereName+"开始一次工作"+msg+"完成。现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
       		}//end of if empty

        	//this.sleep(60);
					try 
					{
						sleep(60);
					}
					catch(Exception ee)
					{
						hereLogFile.DoLog(" 睡觉都会报错,切。");
					}
        	hereMonitor.SubRunningNum(myNum);

				}//end of try
				catch(Exception eb){
					//System.out.println(hereName+"遇见了"+hereStr+",出错中。");
					//eb.printStackTrace();
	        hereMonitor.SubRunningNum(myNum);
					hereReceiveLog.DoLog("警告!!线程"+hereName+"工作"+msg+"出错。现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
				}//end of try and catch

			}//end of while	
		}catch(Exception ebb){
     	//System.out.println(hereName+"遇见了"+hereStr+",出错中,这个线程已经死亡。");
	   	hereMonitor.AddDeadOne(myNum);
	   	hereLogFile.DoLog("严重警告!!线程"+hereName+"死亡。现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个");
		}
	}//end of run
}//end of ReadQueueThread

/*
here is start of thread ReadFromQueue.java
*/

class DaemonThread extends Thread {
	//CMPPAPI hereCMPPAPI = null;
	Queue hereQueue;
	LogToFile hereLogFile = null;
	LogToFile threadLogFile = null;
	LogToFile ReceiveSMSLog = null;
	Object output = null;
	ResControlCenter hereMonitor = null;
	String logStr = new String();
	PrintWriter log;
	String logFile = new String();
	ReadQueueThreada[] myThread;
	final int MAXQUEUELENGTH = 500;
	final int MINQUEUELENGTH = 20;

  public DaemonThread(Queue inQueue) {
    		//hereCMPPAPI =  inpCMPPAPI;
    		hereLogFile=new LogToFile("SDaemon");
    		threadLogFile=new LogToFile("SThreadLog");
    		ReceiveSMSLog=new LogToFile("RLog");
		hereQueue = inQueue;
		hereMonitor = new ResControlCenter(5,3);//共开启15个线程,可以运行为5个
		myThread = new ReadQueueThreada[hereMonitor.GetMaxNum()];
		hereLogFile.DoLog("守护线程启动\n初始化成功");
  }//end of 构造函数
		

	public void run() { //线程的执行方法 
		int i=0;
		int tmpCount=0;
		System.out.println("Henry log: Daemon thread running.");
		hereLogFile.DoLog( "Henry log: Daemon thread runs here!" );
	  while (hereMonitor.ApplyNewThread()) 
		{
	  	try
			{
					//myThread[i] = new ReadQueueThreada(hereQueue,hereMonitor,i,threadLogFile,ReceiveSMSLog,hereCMPPAPI);
					myThread[i] = new ReadQueueThreada(hereQueue,hereMonitor,i,threadLogFile,ReceiveSMSLog);
					myThread[i].start();
					hereLogFile.DoLog("守护线程启动"+(i+1)+"个线程,成功!,队列长度是"+hereQueue.GetLength());
					i++;
			}
			catch(Exception eb)
			{
					hereLogFile.DoLog("警告!!守护线程启动到"+i+"个线程,失败!");
			}
		}

    while (true)
		{
      i = hereMonitor.GetDeadNum();
      if (i>-1)
			{
		    try
				{
		      //myThread[i] = new ReadQueueThreada(hereQueue,hereMonitor,i,threadLogFile,ReceiveSMSLog,hereCMPPAPI);
		      myThread[i] = new ReadQueueThreada(hereQueue,hereMonitor,i,threadLogFile,ReceiveSMSLog);
					myThread[i].start();
					hereLogFile.DoLog("守护线程发现死亡线程"+i+",并且重新启动"+i+"个线程,成功!");
				}
				catch(Exception ea)
				{
					hereLogFile.DoLog("警告!!守护线程发现死亡线程"+i+",重新启动出错,失败!");
				}//end of try and catch

      }//end of if i

			hereLogFile.DoLog("线程状态报告:现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());

			if (hereQueue.GetLength()>=MAXQUEUELENGTH)
			{
					hereLogFile.DoLog("一般警告:队列太长:现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
					hereMonitor.AddCouldRunNum(2);
					hereLogFile.DoLog(" 采取措施,增加两个可以运行线程数量。");
			}
			if (hereQueue.GetLength()<=MAXQUEUELENGTH)
			{
					hereLogFile.DoLog("一般警告:队列太短:现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
					hereMonitor.SubCouldRunNum(2);
					hereLogFile.DoLog(" 采取措施,减少两个可以运行线程数量。");
			}

			try 
			{
				sleep(15000);
			}
			catch(Exception ee)
			{
				hereLogFile.DoLog(" 睡觉都会报错,切。");
			}
   	}//end of while
	}//end of run

}//end of DaemonThread