我气死了都!终于登上博客了!什么破网络!本来都不打算写了!不过这个问题已经总结了有一段时间了,就是生产者消费者模式,我是用professor上课的代码来分析的。下面我就贴上代码吧,在代码中有注释,就是对生产者和消费者的分析吧。我要抓紧时间,争取在这一周把XML搞定,现在任务比较重啊,下周期中考试了DataNetworkd的,MMD,XML已经看到一半了,还剩下一半希望能尽快搞定。真多内容啊,XML感觉记得东西太多了,烦死了。
以下代码例子是一对多的生产者消费者模式,有一个生产者,有多个消费者
Message.java: //对象,需要存放到集合中,是由生产者生产,由消费者消费的,并且在对象中需要指明该对象是由哪个消费者消费的,所以有成员变量CustomerID
public class Message
{
private int id;
private String message;
private Date timestamp;
private int consumerId; //需要指明该对象是由哪个消费者消费的,所以有成员变量CustomerID
public Message(int id, String message, Date timestamp, intconsumerId)
{
this.id = id;
this.message = message;
this.timestamp = timestamp;
this.consumerId= consumerId;
}
@Override
public String toString()
{
return "Message ID: " + id
+ " Timestamp: " + timestamp
+ " Message: " + message
+ " Consumer ID: " + getConsumerId();
}
public int getConsumerId()
{
return consumerId;
}
}
Producer.java: //生产者用线程表示
public class Producer implements Runnable
{
static final int MAXQUEUE = 5; //定义对大队列容量
private List<Message> messageList =new ArrayList<Message>();//集合用于存放生产者生产的对象
public void run()
{
while (true)
{
putMessage();
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
}
}
}
private synchronized void putMessage()//生产者需要有这个方法用于生产对象,注意消费者也有一个方法用于消费对象,但是这个消费对象的方法定义在生产者类中!这是为什么呢?原因在于这两个方法都必须定义为synchronized,线程间(生产者和消费者)需要同步,所以定义在同一个类下才能同步。所以在下面会看到getMessage方法是消费者用于消费对象的,于是乎在消费者的类中,需要定义一个成员变量为生产者的引用,这样就可以调用这个getMessage方法了
{
while (messageList.size() >= MAXQUEUE)
//if(messageList.size() >= MAXQUEUE)//注意这里只能用while循环不能用if来判断,if判断只适合1对1的关系,就是只有2个线程,如果多个线程用if判断的话,就会出现下述情况:生产者首先因为messageList.size()等于5了,进入循环,wait后释放锁,消费者执行getMessage,因为对象都有自己的ConsumerId,当这个消费者发现队列的第0个元素不属于它的话,它不会消费掉这个对象,而是notifyAll()了,并返回空(在这种情况下返回,集合中的对象仍旧是5个),此时此刻若是在waitpool中的生产者进入到了lockpool中并且先获得了锁,那么生产者就从wait()处又开始运行了,由于是if,所以直接跳出去了,跳到了后面继续添加对象到集合中的那段代码(就是如下第11行代码处又开始运行添加对象了)。所以此处要用while循环。综上所述,在多线程中都用while来做判断,不用if了
{
try
{
notifyAll();
wait();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
String[] messages =
{
"Low on space!", "Need another Quad Core!", "HDD Failure!", "SocketError!"
};
Random rnd = new Random();
messageList.add(newMessage(rnd.nextInt(messages.length),messages[rnd.nextInt(messages.length)], new Date(), rnd.nextInt(3)+ 1));
System.out.println("Producer: Queue has " + messageList.size() + "messages...");
notify();
}
// called by Consumer
public synchronized Message getMessage(intconsumerId)//该方法由消费者调用,但是该方法放到了生产者类中,原因在于该方法需要和putMessage方法同步!需要给同一个对象上锁!所以在消费者对象中需要有一个成员变量为生产者的引用,才可以调用该方法
{
Message message = null;
while (messageList.size() == 0)
//if(messageList.size() == 0)//在多线程中都用while来做判断,不用if了
{
try
{
notifyAll();
wait();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
if (consumerId == messageList.get(0).getConsumerId())
{
return ((Message) messageList.remove(0));
}
notify();
return message;
}
public int getListSize()
{
return messageList.size();
}
}
NamedConsumer.java://消费者类,也用线程表示
public class NamedConsumer implements Runnable
{
private Producer producer;//因为消费者需要调用生产者的getMessage方法,所以需要持有生产者的引用,getMessage方法是synchronized的,和putMessage方法需同步,所以都放在了生产者类中定义了
private int id; //该id是用于给对象识别的,也是用来标识这个消费者对象的
private longsleepInterval;
private List<Message>msgList;//消费者中的这个集合是用于存放当前这个消费者对象所消费的Message对象
public NamedConsumer(int id, Producer producer, longsleepInterval)
{
this.producer = producer;
this.id = id;
this.sleepInterval = sleepInterval;
msgList = newArrayList<Message>();
}
public void run()
{
while (true)
{
Message message = producer.getMessage(id);//根据消费者ID来获得对象
if (message != null)//如果返回不为空,表示该消费者已经消费了集合(这个集合不是msgList,而是生产者中的集合,用于存放所有Message对象的) 中的第0个对象元素
{
msgList.add(message);//把消费了的Message对象放入到msgList集合中
//System.out.println("Consumer:" + id + " got message: n" + message + "nof " +producer.getListSize() + " messages...");
}
try
{
Thread.sleep(sleepInterval);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
@Override
public String toString()
{
String messageString = "";
for (Message m : msgList)
{
messageString += m + "n";
}
return messageString;
}
}
Main.java:
public class Main
{
public static void main(String[] args)
{
Producer producer = new Producer();
new Thread(producer).start();
NamedConsumer consumer1 = new NamedConsumer(1, producer,5000L);
Thread c1 = new Thread(consumer1);
c1.setDaemon(true);
c1.start();
NamedConsumer consumer2 = new NamedConsumer(2, producer,6000L);
Thread c2 = new Thread(consumer2);
c2.setDaemon(true);
c2.start();
NamedConsumer consumer3 = new NamedConsumer(3, producer,7000L);
Thread c3 = new Thread(consumer3);
c3.setDaemon(true);
c3.start();
BufferedReader in = new BufferedReader(newInputStreamReader(System.in));//输入任意键结束整个程序,结束原因在于main方法是一个主线程,这个主线程也一直在跑,且停止在了下面的in.read();这句话上。而其他的生产者线程和消费者线程则在run()方法里面跑着,当主线程输入东西执行了in.read()后,在finally块中执行了System.exit(0);结束了整个程序
try
{
System.out.println("Press a key to exit...");
in.read();
System.out.println("Exiting....");
in.close();
} catch (IOException ioe)
{
ioe.printStackTrace();
} finally
{
System.out.println("Consumer 1's messages: n" + consumer1 +"n");
System.out.println("Consumer 2's messages: n" + consumer2 +"n");
System.out.println("Consumer 3's messages: n" + consumer3 +"n");
System.exit(0);
}
}
}