www.pudn.com > yidong.rar > TestSend.java
package sample;
import aiismg.jcmppapi30.CMPPAPI;
import aiismg.jcmppapi30.CMPPDeliverResp;
import java.io.*;
import java.net.*;
import java.util.Date;
import java.util.*;
public class TestSend {
public static void main( String argv[] ) throws IOException
{
long dd = 12;
int bc = 0;
String input;
LogToFile logToFile = new LogToFile("TestSend");
Queue myQueue = new Queue();
CMPPAPI herepCMPPAPI=new CMPPAPI();
if( herepCMPPAPI.InitCMPPAPI( "../config/javacmppc.ini" ) != 0 )
{
//System.out.println( "Fail to call InitCMPPAPI!" );
//System.exit( 1 );
logToFile.DoLog("Fail to call InitCMPPAPI!");
}
else
{
logToFile.DoLog("Success to call InitCMPPAPI!");
}
System.out.println("我准备向队列里面写入东西");
/*
myQueue.enq("QA");
myQueue.enq("QB");
myQueue.enq("QC");
myQueue.enq("QD");
myQueue.enq("QE");
myQueue.enq("QF");
myQueue.enq("QG");
myQueue.enq("QGG");
myQueue.enq("QH");
myQueue.enq("QI");
myQueue.enq("QJ");
myQueue.enq("QK");
myQueue.enq("QL");
myQueue.enq("QM");
myQueue.enq("QN");
myQueue.enq("QO");
myQueue.enq("QP");
myQueue.enq("QQ");
myQueue.enq("QR");
*/
while (bc<500)
{
myQueue.enq("QA");
myQueue.enq("QB");
myQueue.enq("QC");
myQueue.enq("QD");
myQueue.enq("QE");
myQueue.enq("QF");
myQueue.enq("QG");
myQueue.enq("QGG");
myQueue.enq("QH");
myQueue.enq("QI");
myQueue.enq("QJ");
myQueue.enq("QK");
myQueue.enq("QL");
myQueue.enq("QM");
myQueue.enq("QN");
myQueue.enq("QO");
myQueue.enq("QP");
myQueue.enq("QQ");
myQueue.enq("QR");
myQueue.enq("QS");
myQueue.enq("QT");
myQueue.enq("QU");
myQueue.enq("QV");
myQueue.enq("end");
bc++;
}
System.out.println("主进程队列处理完毕,队列长度是"+myQueue.GetLength());
DaemonThread daemonSendThreadA = new DaemonThread(myQueue);
daemonSendThreadA.setDaemon(true);
daemonSendThreadA.start();
System.out.println("主进程队列处理完毕等待守护进程工作");
try
{
daemonSendThreadA.join();
}
catch ( InterruptedException ex)
{
}
/*
while(daemonSendThreadA.isAlive() && bc<1500000)
{
if ( bc % 1000 == 0)
System.out.println("主进程等待守护进程中,第"+bc+"次");
bc = bc+1;
for (int aaa=0;aaa<20000;aaa++)
{
dd = 25*25*bc;
}
}
*/
System.out.println("主进程等待守护进程结束,它"+daemonSendThreadA.isAlive()+"活着");
System.exit(0);
}//end of main //LogToFile logToFile = new LogToFile("TestReceive");
}//end of class TestReceive
/*
here is the Queue Class code
*/
class Queue {
static Vector hereVector;
static int hereLength;
public Queue()
{
if (hereVector==null)
{
hereVector= new Vector();
hereLength = 0;
}
}
public static synchronized void enq(Object x) {
//super.addElement(x);
hereVector.add(x);
hereLength++;
}
public static synchronized Object deq() {
/* 队列若为空,引发EmptyQueueException异常 */
if( empty() )
{return null;}
Object x = hereVector.elementAt(0);
hereVector.removeElementAt(0);
hereLength--;
return x;
}
public static synchronized Object front() {
if( empty() )
{return null;}
return hereVector.elementAt(0);
}
public static boolean empty() {
return hereVector.isEmpty();
}
public static synchronized void clear() {
hereVector.removeAllElements();
}
public static int search(Object x) {
return hereVector.indexOf(x);
}
public static synchronized int GetLength() {
return hereLength;
}
}
class LogToFile{
String logStr = new String();
PrintWriter log;
String logFile = new String();
String extFile = new String();
int countNum=0;
final int WarningNum=200000;
int a=1;
public LogToFile(String inLogFile){
GregorianCalendar lpMyCalender = new GregorianCalendar( );
logFile = "./"+inLogFile+lpMyCalender.get(lpMyCalender.YEAR)+(1 + lpMyCalender.get(lpMyCalender.MONTH))+lpMyCalender.get(lpMyCalender.DATE);
//logFile = "./"+inLogFile+(1900+new Date().getYear())+"-"+(1+new Date().getMonth())+"-"+new Date().getDate();
extFile = ".log";
try {
log = new PrintWriter(new FileWriter(logFile+extFile, true), true);
}
catch (IOException e) {
System.err.println("无法打开日志文件: " + logFile);
}
}
public synchronized void DoLog(String inStr)
{
log.println(new Date() + " % " + inStr.replace('\n','N').replace('\r','R') + " %end% ");
countNum++;
if (countNum>WarningNum)
{
try
{
log = new PrintWriter(new FileWriter(logFile+"_"+a+extFile, true), true);
countNum=0;
a++;
}
catch (IOException e)
{
System.err.println("无法打开日志文件: " + logFile);
}
}
}
}
/*
here is the monitor resource control code
*/
class ResControlCenter{
static int readyNum=0;
static int runningNum=0;
static int AllNum=0;
static int DeadNum=0;
static int CouldRunning=4;//初始设置。默认4,最大99
static int MaxNum=9;//促使设置。默认9,最大99
String[] nameAry;
int[] status;
public ResControlCenter(int inMaxNum,int inCouldRunNum)
{
if (inMaxNum>1)
{
MaxNum = inMaxNum;
}
if (inCouldRunNum>1)
{
CouldRunning = inCouldRunNum;
}
nameAry = new String[(MaxNum+1)];
status = new int[(MaxNum+1)];
for (int t=0;t1)
{
CouldRunning = CouldRunning-i;
}
}
synchronized public int GetAllNum(){
return AllNum;
}
synchronized public int GetReadyNum(){
return readyNum;
}
synchronized public int GetRunningNum(){
return runningNum;
}
synchronized public int GetMaxNum(){
return MaxNum;
}
synchronized public String GetThreadName(int i){
return nameAry[i];
}
synchronized public boolean ApplyNewThread(){
boolean ret = false;
if (AllNum=CouldRunning){
try{
wait();
}catch(Exception ea)
{
System.out.println("异常.");
}
}
runningNum = runningNum+1;
readyNum = readyNum-1;
status[i] = 2;//写成Running状态
}
synchronized public int GetDeadNum(){
int ret = -1;
if (DeadNum>0)
{
for (int j=0;j140)
{
msgLength=140;
}
byte[] sMsgCon = new byte[msgLength];
System.arraycopy( msg.getBytes(), 0, sMsgCon, 0, msgLength );
byte[] ba = new byte[17];
hereLogFile.DoLog( "Henry log: ReadQueueThreada before SendSingle, myNum:"+myNum+", thread "+hereName +" start working, sending "+msg );
System.out.println( "Henry log: ReadQueueThreada before SendSingle, myNum:"+myNum+", thread "+hereName +" start working, sending "+msg );
if ( myNum % 3 != 0 )
{
if( herepCMPPAPI.CMPPSendSingle( needReply, msgLevel,sServiceID,
msgFormat,sFeeType, sFeeCode,ba, ba,sSrcTermID, sDestTermID,
msgLength, sMsgCon,sMsgID, (byte)0, null, (byte)0, (byte)0,(byte)0,(byte)0,null ) != 0 )
{
System.out.println( "ReadQueueThreada: myNum="+ myNum +
". Fail to call CMPPSendSingle, error=" + herepCMPPAPI.GetErrCode() );
hereReceiveLog.DoLog("线程"+hereName+"发送"+msg+"失败");
//System.exit( 1 );
}
else
{
//System.out.println("Send is OVER!");
hereLogFile.DoLog("; "+countnum+" ;守护线程发送"+msg+"成功");
}//end of if for send sms
}
else
{
if( herepCMPPAPI.CMPPDeliver( 5, lpDeliverResp ) != 0 )
{
System.out.println( "ReadQueueThreada: myNum="+ myNum +
". Fail to call CMPPDeliver, error=" + herepCMPPAPI.GetErrCode() );
hereReceiveLog.DoLog("Thread "+hereName+"receive failed");
}
else
{
hereLogFile.DoLog("; "+countnum+" ;daemon thread succeed to receive msg.");
}
}
hereLogFile.DoLog("线程"+hereName+"开始一次工作"+msg+"完成。现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
}//end of if empty
//this.sleep(60);
try
{
sleep(60);
}
catch(Exception ee)
{
hereLogFile.DoLog(" 睡觉都会报错,切。");
}
hereMonitor.SubRunningNum(myNum);
}//end of try
catch(Exception eb){
//System.out.println(hereName+"遇见了"+hereStr+",出错中。");
//eb.printStackTrace();
hereMonitor.SubRunningNum(myNum);
hereReceiveLog.DoLog("警告!!线程"+hereName+"工作"+msg+"出错。现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
}//end of try and catch
}//end of while
}catch(Exception ebb){
//System.out.println(hereName+"遇见了"+hereStr+",出错中,这个线程已经死亡。");
hereMonitor.AddDeadOne(myNum);
hereLogFile.DoLog("严重警告!!线程"+hereName+"死亡。现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个");
}
}//end of run
}//end of ReadQueueThread
/*
here is start of thread ReadFromQueue.java
*/
class DaemonThread extends Thread {
//CMPPAPI hereCMPPAPI = null;
Queue hereQueue;
LogToFile hereLogFile = null;
LogToFile threadLogFile = null;
LogToFile ReceiveSMSLog = null;
Object output = null;
ResControlCenter hereMonitor = null;
String logStr = new String();
PrintWriter log;
String logFile = new String();
ReadQueueThreada[] myThread;
final int MAXQUEUELENGTH = 500;
final int MINQUEUELENGTH = 20;
public DaemonThread(Queue inQueue) {
//hereCMPPAPI = inpCMPPAPI;
hereLogFile=new LogToFile("SDaemon");
threadLogFile=new LogToFile("SThreadLog");
ReceiveSMSLog=new LogToFile("RLog");
hereQueue = inQueue;
hereMonitor = new ResControlCenter(5,3);//共开启15个线程,可以运行为5个
myThread = new ReadQueueThreada[hereMonitor.GetMaxNum()];
hereLogFile.DoLog("守护线程启动\n初始化成功");
}//end of 构造函数
public void run() { //线程的执行方法
int i=0;
int tmpCount=0;
System.out.println("Henry log: Daemon thread running.");
hereLogFile.DoLog( "Henry log: Daemon thread runs here!" );
while (hereMonitor.ApplyNewThread())
{
try
{
//myThread[i] = new ReadQueueThreada(hereQueue,hereMonitor,i,threadLogFile,ReceiveSMSLog,hereCMPPAPI);
myThread[i] = new ReadQueueThreada(hereQueue,hereMonitor,i,threadLogFile,ReceiveSMSLog);
myThread[i].start();
hereLogFile.DoLog("守护线程启动"+(i+1)+"个线程,成功!,队列长度是"+hereQueue.GetLength());
i++;
}
catch(Exception eb)
{
hereLogFile.DoLog("警告!!守护线程启动到"+i+"个线程,失败!");
}
}
while (true)
{
i = hereMonitor.GetDeadNum();
if (i>-1)
{
try
{
//myThread[i] = new ReadQueueThreada(hereQueue,hereMonitor,i,threadLogFile,ReceiveSMSLog,hereCMPPAPI);
myThread[i] = new ReadQueueThreada(hereQueue,hereMonitor,i,threadLogFile,ReceiveSMSLog);
myThread[i].start();
hereLogFile.DoLog("守护线程发现死亡线程"+i+",并且重新启动"+i+"个线程,成功!");
}
catch(Exception ea)
{
hereLogFile.DoLog("警告!!守护线程发现死亡线程"+i+",重新启动出错,失败!");
}//end of try and catch
}//end of if i
hereLogFile.DoLog("线程状态报告:现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
if (hereQueue.GetLength()>=MAXQUEUELENGTH)
{
hereLogFile.DoLog("一般警告:队列太长:现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
hereMonitor.AddCouldRunNum(2);
hereLogFile.DoLog(" 采取措施,增加两个可以运行线程数量。");
}
if (hereQueue.GetLength()<=MAXQUEUELENGTH)
{
hereLogFile.DoLog("一般警告:队列太短:现共有"+hereMonitor.GetAllNum()+"个线程,其中READY的是"+hereMonitor.GetReadyNum()+"个,运行的是"+hereMonitor.GetRunningNum()+"个,队列长度是"+hereQueue.GetLength());
hereMonitor.SubCouldRunNum(2);
hereLogFile.DoLog(" 采取措施,减少两个可以运行线程数量。");
}
try
{
sleep(15000);
}
catch(Exception ee)
{
hereLogFile.DoLog(" 睡觉都会报错,切。");
}
}//end of while
}//end of run
}//end of DaemonThread