public class Queue extends Destination implements QueueMBean
Queue
class implements the MOM queue behavior,
basically storing messages and delivering them upon clients requests.Modifier and Type | Class and Description |
---|---|
static class |
Queue.QueueFactory |
Modifier and Type | Field and Description |
---|---|
static String |
ARRIVAL_STATE_PREFIX |
protected QueueArrivalState |
arrivalState
Counter of messages arrivals.
|
(package private) int |
cload |
(package private) static AgentId |
defaultDMQId
Static value holding the default DMQ identifier for a server.
|
(package private) static int |
defaultRedeliveryDelay
Static value holding the default redelivery delay for a server.
|
(package private) static int |
defaultThreshold
Static value holding the default threshold for a server.
|
(package private) int |
delayedMessageCount |
static String |
DELIVERY_TABLE_PREFIX |
private int |
deliveryDelay
The delivery delay in milliseconds used to wait before delivering a message.
|
protected QueueDeliveryTable |
deliveryTable
Table keeping the message deliveries
|
(package private) long |
hcons |
(package private) long |
hprod |
static org.objectweb.util.monolog.api.Logger |
logger |
static org.objectweb.util.monolog.api.Logger |
logmsg |
protected List<Message> |
messages
List holding the messages before delivery.
|
(package private) StringBuffer |
msgTxPrefix |
(package private) int |
msgTxPrefixLength |
protected int |
nbExpirations
Number of stored messages with an expiration date.
|
protected int |
nbMaxMsg
nb Max of Message store in queue (-1 no limit).
|
protected long |
nbMsgsDeniedSinceCreation |
private boolean |
pause |
(package private) int |
pload |
private int |
priority
Common priority value.
|
protected boolean |
receiving
true if the queue is currently handling a new received message. |
private int |
redeliveryDelay
The re-delivery delay in seconds use to wait before re-delivering
messages after a deny.
|
protected List<ReceiveRequest> |
requests
List holding the requests before reply or expiry.
|
private boolean |
samePriorities
true if all the stored messages have the same priority. |
private static long |
serialVersionUID
define serialVersionUID for interoperability
|
private boolean |
syncExceptionOnFullDest
if true, throws an exception on sending message on full destination.
|
private int |
threshold
Threshold above which messages are considered as undeliverable because
constantly denied; 0 stands for no threshold, -1 for value not set.
|
_rights, clients, creationDate, dmqId, freeReading, freeWriting, nbMsgsDeliverSinceCreation, nbMsgsSentToDMQSinceCreation, READ, READWRITE, strbuf, strictCounters, task, temporary, WRITE
agentProfiling, emptyString, fixed, logmon
BOOLEAN_ENCODED_SIZE, BYTE_ENCODED_SIZE, DOUBLE_ENCODED_SIZE, FLOAT_ENCODED_SIZE, INT_ENCODED_SIZE, LONG_ENCODED_SIZE, SHORT_ENCODED_SIZE
Modifier | Constructor and Description |
---|---|
|
Queue()
Creates a queue.
|
protected |
Queue(String name,
boolean fixed,
int stamp)
Creates a queue with a specified stamp.
|
Modifier and Type | Method and Description |
---|---|
protected void |
abortReceiveRequest(AgentId from,
AbortReceiveRequest not) |
private void |
acknowledge(String msgId) |
protected void |
acknowledgeRequest(AcknowledgeRequest not)
Method implementing the reaction to an
AcknowledgeRequest
instance, requesting messages to be acknowledged. |
void |
addClientMessages(ClientMessages clientMsgs,
boolean throwsExceptionOnFullDest)
Adds the client messages in the queue.
|
(package private) void |
addDeliveryTimeMessage(Message msg,
int clientCtx,
boolean throwsExceptionOnFullDest,
boolean isHeader) |
protected boolean |
addMessage(Message message,
boolean throwsExceptionOnFullDest)
Adds a message in the list of messages to deliver.
|
protected void |
agentSave()
Enables the sub-classes to save their state.
|
protected void |
browseRequest(AgentId from,
BrowseRequest not)
Method implementing the queue reaction to a
BrowseRequest
instance, requesting an enumeration of the messages on the queue. |
protected boolean |
checkDelivery(Message msg)
Returns true if conditions are ok to deliver the message.
|
void |
cleanPendingMessage()
Removes all messages that the time-to-live is expired.
|
protected DMQManager |
cleanPendingMessage(long currentTime)
Cleans the pending messages list.
|
void |
cleanWaitingRequest()
Removes all request that the expiration time is expired.
|
protected void |
cleanWaitingRequest(long currentTime)
Cleans the waiting request list.
|
void |
clear()
Removes all pending messages.
|
private void |
clearQueue(AgentId replyTo,
String requestMsgId,
String replyMsgId) |
void |
decode(Decoder decoder)
Decodes the object.
|
private void |
deleteQueueMessage(DeleteQueueMessage request,
AgentId replyTo,
String requestMsgId,
String replyMsgId) |
protected void |
deliverMessages(int index)
Actually tries to answer the pending "receive" requests.
|
protected void |
denyRequest(AgentId from,
DenyRequest not)
Method implementing the reaction to a
DenyRequest
instance, requesting messages to be denied. |
protected void |
doClientMessages(AgentId from,
ClientMessages not,
boolean throwsExceptionOnFullDest)
Method specifically processing a
ClientMessages instance. |
protected void |
doDeleteNot(DeleteNot not)
Method specifically processing a
fr.dyade.aaa.agent.DeleteNot instance. |
protected void |
doRightRequest(AgentId user,
int right)
Method specifically processing a
SetRightRequest instance. |
protected void |
doUnknownAgent(UnknownAgent uA)
Method specifically processing an
UnknownAgent instance. |
void |
encode(Encoder encoder)
Encodes the object.
|
protected void |
finalize(boolean last)
Finalizes the destination before it is garbaged.
|
protected ClientMessages |
getClientMessages(int nb,
String selector,
boolean remove)
Get a client message contain
nb messages. |
long |
getConsumerLoad()
Return the average consumer's load during last moments.
|
static AgentId |
getDefaultDMQId()
Static method returning the default DMQ identifier.
|
static int |
getDefaultRedeliveryDelay()
Static method returning the default redelivery delay for a server.
|
static int |
getDefaultThreshold()
Static method returning the default threshold.
|
int |
getDelayedMessageCount()
Returns the number of messages waiting for a delay.
|
int |
getDeliveredMessageCount()
Returns the number of messages delivered and waiting for acknowledge.
|
int |
getDeliveryDelay()
Returns the Queue deliveryDelay in milliseconds.
|
int |
getEncodableClassId()
Enables the sub classes not to implement this method.
|
int |
getEncodedSize()
Returns the size of the encoded object.
|
private static String |
getIdString(Message msg) |
CompositeData |
getMessage(String msgId)
Returns the description of a particular pending message.
|
TabularData |
getMessages()
Returns the description of all pending messages.
|
private List |
getMessages(int nb,
String selector,
boolean remove)
get messages, if it's possible.
|
List<? extends MessageView> |
getMessagesView() |
private Message |
getMomMessage(String msgId) |
protected StringBuffer |
getMsgTxPrefix() |
int |
getNbMaxMsg()
Returns the maximum number of message for the destination.
|
long |
getNbMsgsDeliverSinceCreation()
Returns the number of messages delivered since creation time of this
destination.
|
long |
getNbMsgsDeniedSinceCreation()
Returns the number of messages denied since creation time of this
destination.
|
long |
getNbMsgsReceiveSinceCreation()
Returns the number of messages received since creation time of this
destination.
|
int |
getPendingMessageCount()
Returns the number of pending messages in the queue.
|
long |
getProducerLoad()
Return the average producer's load during last moments.
|
private void |
getQueueMessage(GetQueueMessage request,
AgentId replyTo,
String requestMsgId,
String replyMsgId) |
protected Message |
getQueueMessage(String msgId,
boolean remove)
get mom message, delete if remove = true.
|
private void |
getQueueMessageIds(AgentId replyTo,
String requestMsgId,
String replyMsgId) |
int |
getRedeliveryDelay()
Returns the delay in seconds use to wait before re-delivering messages
after a deny.
|
protected Properties |
getStats() |
int |
getThreshold()
Returns the threshold value of this queue, -1 if not set.
|
byte |
getType()
Returns the type of this destination: Queue or Topic.
|
int |
getWaitingRequestCount()
Returns the number of waiting requests in the queue.
|
void |
handleAdminRequestNot(AgentId from,
FwdAdminRequestNot not) |
protected void |
handleExpiredNot(AgentId from,
ExpiredNot not) |
protected void |
initialize(boolean firstTime)
Initializes the destination.
|
boolean |
isPause() |
boolean |
isSyncExceptionOnFullDest() |
protected boolean |
isUndeliverable(Message message)
Returns
true if a given message is considered as undeliverable,
because its delivery count matches the queue's threshold, if any, or the
server's default threshold value (if any). |
protected boolean |
isValidJMXAttribute(String attrName)
This method allows to exclude some JMX attribute of getJMXStatistics method.
|
protected void |
messageDelivered(String msgId)
call in deliverMessages just after forward(msg),
overload this method to process a specific treatment.
|
protected void |
messageRemoved(String msgId)
call in deliverMessages just after a remove message (invalid),
overload this method to process a specific treatment.
|
(package private) void |
processDeliveryTime(AgentId from,
QueueDeliveryTimeNot not) |
void |
react(AgentId from,
Notification not)
Distributes the received notifications to the appropriate reactions.
|
protected void |
receiveRequest(AgentId from,
ReceiveRequest not)
Method implementing the reaction to a
ReceiveRequest
instance, requesting a message. |
static void |
setDefaultRedeliveryDelay(int reDeliveryDelay) |
void |
setDeliveryDelay(int deliveryDelay)
Sets the Queue deliveryDelay in milliseconds.
|
protected void |
setMsgTxName(Message msg) |
void |
setNbMaxMsg(int nbMaxMsg)
Sets the maximum number of message for the destination.
|
void |
setPause(boolean pause) |
void |
setProperties(Properties properties,
boolean firstTime)
Configures an
Queue instance. |
void |
setRedeliveryDelay(int redeliveryDelay)
Sets the delay in seconds use to wait before re-delivering messages
after a deny.
|
void |
setSyncExceptionOnFullDest(boolean syncExceptionOnFullDest) |
void |
setThreshold(int threshold)
Sets or unsets the threshold for this queue.
|
protected void |
storeMessage(Message msg,
boolean throwsExceptionOnFullDest)
Actually stores a message in the deliverables list.
|
protected void |
storeMessageHeader(Message message,
boolean throwsExceptionOnFullDest)
Actually stores a message header in the deliverables list.
|
String |
toString()
Returns a string representation of this destination.
|
void |
wakeUpNot(WakeUpNot not)
wake up, and cleans the queue.
|
agentFinalize, agentInitialize, clientMessages, delete, deleteNot, forward, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQAgentId, getDMQId, getJMXStatistics, getMBeanName, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, getRights, handleDeniedMessage, interceptorsAvailable, isAdministrator, isFreeReading, isFreeWriting, isLocal, isReader, isWriter, postProcess, preProcess, processAdminCommand, processInterceptors, processPause, processSetRight, processStartHandler, processStopHandler, replyToTopic, requestGroupNot, setAdminId, setFreeReading, setFreeWriting, setPeriod, setRight, unknownAgent
delete, delete, deploy, deploy, getAgentId, getCommitTime, getId, getLogTopic, getName, getReactNb, getReactTime, hasName, incWorkInProgress, isAgentProfiling, isDeployed, isFixed, isUpdated, needToBeCommited, resetCommitTime, resetReactTime, resetTimer, save, sendTo, sendTo, sendTo, setAgentProfiling, setName, setNoSave, setSave
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
delete, getCreationDate, getCreationTimeInMillis, getDestinationId, getDMQId, getName, getNbMsgsSentToDMQSinceCreation, getPeriod, getRight, getRights, isFreeReading, isFreeWriting, setFreeReading, setFreeWriting, setPeriod
getAgentId, getCommitTime, getReactNb, getReactTime, isAgentProfiling, isFixed, resetCommitTime, resetReactTime, resetTimer, setAgentProfiling
private static final long serialVersionUID
public static org.objectweb.util.monolog.api.Logger logger
public static org.objectweb.util.monolog.api.Logger logmsg
public static final String DELIVERY_TABLE_PREFIX
public static final String ARRIVAL_STATE_PREFIX
static AgentId defaultDMQId
private int threshold
static int defaultThreshold
private int redeliveryDelay
static int defaultRedeliveryDelay
private int deliveryDelay
private boolean pause
private boolean samePriorities
true
if all the stored messages have the same priority.
Note: messages
list is ordered by priorities, so we could test if
first and last message have the same priority.protected int nbExpirations
private int priority
protected transient QueueDeliveryTable deliveryTable
protected transient QueueArrivalState arrivalState
protected List<ReceiveRequest> requests
protected transient boolean receiving
true
if the queue is currently handling a new received message.transient int delayedMessageCount
protected long nbMsgsDeniedSinceCreation
protected int nbMaxMsg
long hprod
long hcons
int pload
int cload
transient StringBuffer msgTxPrefix
transient int msgTxPrefixLength
private boolean syncExceptionOnFullDest
public Queue()
protected Queue(String name, boolean fixed, int stamp)
name
- Name of topic;fixed
- If true, topic is fixed in memory.stamp
- Specific stamp for resulting agent.public int getThreshold()
getThreshold
in interface QueueMBean
public void setThreshold(int threshold)
setThreshold
in interface QueueMBean
threshold
- The threshold value to be set (-1 for unsetting previous value).public static int getDefaultThreshold()
public static AgentId getDefaultDMQId()
public final int getRedeliveryDelay()
getRedeliveryDelay
in interface QueueMBean
public final void setRedeliveryDelay(int redeliveryDelay)
setRedeliveryDelay
in interface QueueMBean
redeliveryDelay
- the reDeliveryDelay to setpublic static final int getDefaultRedeliveryDelay()
public static final void setDefaultRedeliveryDelay(int reDeliveryDelay)
public final int getDeliveryDelay()
getDeliveryDelay
in interface QueueMBean
public final void setDeliveryDelay(int deliveryDelay)
setDeliveryDelay
in interface QueueMBean
deliveryDelay
- the deliveryDelay to setpublic boolean isPause()
isPause
in interface QueueMBean
public void setPause(boolean pause)
setPause
in interface QueueMBean
public void setProperties(Properties properties, boolean firstTime) throws Exception
Queue
instance.setProperties
in class Destination
properties
- The initial set of properties.Exception
public void react(AgentId from, Notification not) throws Exception
react
in class Destination
from
- agent sending notificationnot
- notification to react toException
protected void agentSave() throws IOException
Agent
agentSave
in class Agent
IOException
- if any error occurs.public final void cleanWaitingRequest()
cleanWaitingRequest
in interface QueueMBean
protected void cleanWaitingRequest(long currentTime)
currentTime
- The current time.public final int getWaitingRequestCount()
getWaitingRequestCount
in interface QueueMBean
public byte getType()
Destination
getType
in interface DestinationMBean
getType
in class Destination
DestinationConstants.TOPIC_TYPE
,
DestinationConstants.QUEUE_TYPE
,
DestinationConstants.TEMPORARY
public final void cleanPendingMessage()
cleanPendingMessage
in interface QueueMBean
protected DMQManager cleanPendingMessage(long currentTime)
currentTime
- The current time.DMQManager
which contains the expired messages.
null
if there wasn't any.public final int getPendingMessageCount()
getPendingMessageCount
in interface QueueMBean
public final int getDeliveredMessageCount()
getDeliveredMessageCount
in interface QueueMBean
public final int getDelayedMessageCount()
getDelayedMessageCount
in interface QueueMBean
public final long getNbMsgsDeniedSinceCreation()
getNbMsgsDeniedSinceCreation
in interface QueueMBean
public long getNbMsgsDeliverSinceCreation()
Destination
getNbMsgsDeliverSinceCreation
in interface DestinationMBean
getNbMsgsDeliverSinceCreation
in class Destination
public long getNbMsgsReceiveSinceCreation()
Destination
getNbMsgsReceiveSinceCreation
in interface DestinationMBean
getNbMsgsReceiveSinceCreation
in class Destination
public final int getNbMaxMsg()
getNbMaxMsg
in interface QueueMBean
public void setNbMaxMsg(int nbMaxMsg)
setNbMaxMsg
in interface QueueMBean
nbMaxMsg
- the maximum number of message (-1 set no limit).protected void initialize(boolean firstTime) throws Exception
initialize
in class Destination
firstTime
- true when first called by the factoryException
protected void finalize(boolean last)
finalize
in class Destination
last
- true if the destination is deletedpublic String toString()
toString
in interface AgentMBean
toString
in interface DestinationMBean
toString
in class Agent
public void wakeUpNot(WakeUpNot not)
wakeUpNot
in class Destination
public long getProducerLoad()
getProducerLoad
in interface QueueMBean
public long getConsumerLoad()
getConsumerLoad
in interface QueueMBean
protected boolean isValidJMXAttribute(String attrName)
isValidJMXAttribute
in class Destination
attrName
- name of attribute to test.protected void receiveRequest(AgentId from, ReceiveRequest not) throws AccessException
ReceiveRequest
instance, requesting a message.
This method stores the request and launches a delivery sequence.
AccessException
- If the sender is not a reader.protected void browseRequest(AgentId from, BrowseRequest not) throws AccessException
BrowseRequest
instance, requesting an enumeration of the messages on the queue.
The method sends a BrowseReply
back to the client. Expired
messages are sent to the DMQ.
AccessException
- If the requester is not a reader.protected void acknowledgeRequest(AcknowledgeRequest not)
AcknowledgeRequest
instance, requesting messages to be acknowledged.private void acknowledge(String msgId)
protected void denyRequest(AgentId from, DenyRequest not)
DenyRequest
instance, requesting messages to be denied.
This method denies the messages and launches a delivery sequence. Messages considered as undeliverable are sent to the DMQ.
protected void abortReceiveRequest(AgentId from, AbortReceiveRequest not)
public void handleAdminRequestNot(AgentId from, FwdAdminRequestNot not)
private void getQueueMessageIds(AgentId replyTo, String requestMsgId, String replyMsgId)
private void getQueueMessage(GetQueueMessage request, AgentId replyTo, String requestMsgId, String replyMsgId)
private void deleteQueueMessage(DeleteQueueMessage request, AgentId replyTo, String requestMsgId, String replyMsgId)
public void clear()
clear
in interface QueueMBean
protected void doRightRequest(AgentId user, int right)
SetRightRequest
instance.
When a reader is removed, and receive requests of this reader are still
on the queue, they are replied to by an ExceptionReply
.
doRightRequest
in class Destination
user
- The user about right modification.right
- The right modification.protected void doClientMessages(AgentId from, ClientMessages not, boolean throwsExceptionOnFullDest) throws AccessException
ClientMessages
instance.
This method stores the messages and launches a delivery sequence. This method is used when ClientMessages comes from a JMS client and/or LB notification in ClusterQueue (May be we should use addClienMessages). It is also used in old deprecated JMS bridge.
doClientMessages
in class Destination
AccessException
void addDeliveryTimeMessage(Message msg, int clientCtx, boolean throwsExceptionOnFullDest, boolean isHeader) throws AccessException
AccessException
void processDeliveryTime(AgentId from, QueueDeliveryTimeNot not) throws AccessException
AccessException
protected void doUnknownAgent(UnknownAgent uA)
UnknownAgent
instance.
The specific processing is done when a QueueMsgReply
was
sent to a requester which does not exist anymore. In that case, the
messages sent to this requester and not yet acknowledged are marked as
"denied" for delivery to an other requester, and a new delivery sequence
is launched. Messages considered as undeliverable are removed and sent to
the DMQ.
doUnknownAgent
in class Destination
protected void doDeleteNot(DeleteNot not)
fr.dyade.aaa.agent.DeleteNot
instance.
ExceptionReply
replies are sent to the pending receivers,
and the remaining messages are sent to the DMQ and deleted.
doDeleteNot
in class Destination
protected final StringBuffer getMsgTxPrefix()
protected final void setMsgTxName(Message msg)
protected final void storeMessage(Message msg, boolean throwsExceptionOnFullDest) throws AccessException
msg
- The message to store.throwsExceptionOnFullDest
- true, can throws an exception on sending message on full destinationAccessException
protected final void storeMessageHeader(Message message, boolean throwsExceptionOnFullDest) throws AccessException
message
- The message to store.throwsExceptionOnFullDest
- true, can throws an exception on sending message on full destinationAccessException
public boolean isSyncExceptionOnFullDest()
public void setSyncExceptionOnFullDest(boolean syncExceptionOnFullDest)
syncExceptionOnFullDest
- the syncExceptionOnFullDest to setprotected final boolean addMessage(Message message, boolean throwsExceptionOnFullDest) throws AccessException
message
- the message to add.throwsExceptionOnFullDest
- true, can throws an exception on sending message on full destinationAccessException
- If syncExceptionOnFullDest and the queue isFullprotected ClientMessages getClientMessages(int nb, String selector, boolean remove)
nb
messages.
Only used in ClusterQueue.nb
- number of messages returned in ClientMessage.selector
- jms selectorremove
- delete all messages returned if trueprivate List getMessages(int nb, String selector, boolean remove)
nb
- -1 return all messages.selector
- jms selector.protected Message getQueueMessage(String msgId, boolean remove)
msgId
- message identificationremove
- if true delete messagepublic CompositeData getMessage(String msgId) throws Exception
getMessage
in interface QueueMBean
msgId
- The unique message's identifier.Exception
MessageJMXWrapper
public TabularData getMessages() throws Exception
getMessages
in interface QueueMBean
Exception
MessageJMXWrapper
public List<? extends MessageView> getMessagesView()
protected void deliverMessages(int index)
The method may send QueueMsgReply
replies to clients.
index
- Index where starting to "browse" the requests.protected boolean checkDelivery(Message msg)
protected void messageDelivered(String msgId)
protected void messageRemoved(String msgId)
protected boolean isUndeliverable(Message message)
true
if a given message is considered as undeliverable,
because its delivery count matches the queue's threshold, if any, or the
server's default threshold value (if any).public void addClientMessages(ClientMessages clientMsgs, boolean throwsExceptionOnFullDest) throws AccessException
clientMsgs
- client message notification.throwsExceptionOnFullDest
- true, can throws an exception on sending message on full destinationAccessException
protected void handleExpiredNot(AgentId from, ExpiredNot not)
protected Properties getStats()
getStats
in class Destination
public int getEncodableClassId()
Agent
getEncodableClassId
in interface Encodable
getEncodableClassId
in class Agent
public int getEncodedSize() throws Exception
Agent
getEncodedSize
in interface Encodable
getEncodedSize
in class Destination
Exception
- if an error occurspublic void encode(Encoder encoder) throws Exception
Agent
encode
in interface Encodable
encode
in class Destination
encoder
- the encoderException
- if an error occursCopyright © 2021 ScalAgent D.T.. All rights reserved.