getMessagesFromMqTopic

Retrieve messages published on a specific topic

This action uses the event streaming technique to retrieve messages published on a specific topic by the MQ API and MQTT. It is stateless and does not remove messages from the topic's queue. Use it to tail the end of the queue, retrieve all messages from the beginning of the queue, retrieve messages starting from a timestamp, or paginate through a queue. 

This action allows multiple processes to retrieve messages simultaneously from a topic without moving the client's message position or removing shared messages.

Instead of using a client's message position to retrieve undelivered messages, it retrieves messages from a specified starting location in the topic's message queue. You can retrieve messages from the end or beginning of the queue, a specific timestamp, or a queue position ID.

This action does not disconnect MQTT clients. 

Call this action to retrieve the "maxMessages" number of messages from a topic starting from the position specified by "startFrom". The message payload is encoded according to "binaryFormat"

You must use one but not both of the following properties to identify the message queue from which you want to retrieve messages: 

  • Use "topic" to specify a topic from which you want to retrieve messages. A topic cannot contain wildcards.
  • Use "tableName" to specify the name of an integration table containing messages. You may also set the table's "databaseName" and "ownerName"; otherwise, the server will use the JSON Action Session's default values.

The optional "startFrom" property controls the starting point where the server delivers messages. 

  • Set "startFrom" to the default of ["tail"] to retrieve the most recently sent messages.
  • Set "startFrom" to "head" to retrieve messages from the beginning. 
  • Set "startFrom" to an ISO 8601 DateTime, such as "2024-07-11T21:18:01.000", to retrieve messages starting on or after that date and time. If the time component is missing, it defaults to "00:00:00.000"
  • Set "startFrom" to an integer number, such as 52233, to retrieve messages starting with the message with that ID number. 

Tip

To retrieve the next set of messages, set "startFrom" to one plus the ID number of the last message you received.

Optionally, set "expiredMessagesFilter" to "include", "exclude", or "only" to include or exclude expired messages from the results. Use "only" to include only expired messages in the results. It defaults to "include".

Optionally, set "includeMqttProperties": true to include the message's MQTT properties. It defaults to false.

Optionally, set "includeTransformFields": true to include the custom fields used for the transformation process's inputs and outputs. It defaults to false.

Tip

For more query options, use the JSON DB "getRecords..." actions. You can use the "describeMqSession" to return information about a client's subscriptions, which includes a map of topic names to table names. You can query those tables to see their messages. You can use SQL to run ad hoc queries. You can find a record using partial, exact, or inequality matches. You can move and skip forward and backward through the records in index-sorted order.

 

Request examples

Minimal

{
  "action": "getMessagesFromMqTopic",
  "params": {
    "topic": "test/topic1"
  },
  "authToken": "replaceWithAuthTokenFromCreateSession"
}
 
 

Maximal

{
  "api": "mq",
  "action": "getMessagesFromMqTopic",
  "params": {
    "topic": "test/topic1",
    "databaseName": "faircom",
    "ownerName": "admin",
    "tableName": "mqtt_msg_test_topic1",
    "startFrom": "head",
    "maxMessages": 20,
    "binaryFormat": "hex",
    "expiredMessagesFilter": "include",
    "includeMessageMetadata": true,
    "includeMqttProperties": true,
    "includeTransformFields": true,
    "includeOtherFields": true
  },
  "responseOptions": {
    "dataFormat": "arrays", 
    "numberFormat": "number",
    "binaryFormat": "base64",
    "variantFormat": "json"
  },
  "apiVersion": "1.0",
  "debug": "max",
  "requestId": "optionalUniqueRequestIdFromTheClient",
  "authToken": "replaceWithAuthTokenFromCreateSession"
}
 
 

Retrieve the most recently sent message to topic1 with all metadata and the message payload formatted as JSON

{
  "api": "mq",
  "action": "getMessagesFromMqTopic",
  "params":
  {
    "maxMessages": 1,
    "binaryFormat": "json",

    "topic": "test/topic1",
    "expiredMessagesFilter": "include",

    "includeMessageMetadata": true,
    "includeMqttProperties": true,
    "includeTransformFields": true
  },
  "requestId": "optionalUniqueRequestIdFromTheClient",
  "authToken": "replaceWithAuthTokenFromCreateSession"
}
 
 

Retrieve the last 10 messages sent to topic1 with the message payload formatted as base64 and no metadata

{
  "api": "mq",
  "action": "getMessagesFromMqTopic",
  "params":
  {
    "topic": "test/topic1",

    "startFrom": "tail",
    "maxMessages": 10,
    "binaryFormat": "base64"
  },
  "requestId": "optionalUniqueRequestIdFromTheClient",
  "authToken": "replaceWithAuthTokenFromCreateSession"
}
 
 

Retrieve the last message sent to all topics, format the message payload as hexadecimal, and include all MQTT properties

{
 "api": "mq",
 "action": "getMessagesFromMqTopic",
 "params":
 {
   "topic": "#",
   "maxMessages": 1,
   "binaryFormat": "hex",

   "includeMessageMetadata": true,
   "includeExpiredMessages": true,
   "includeMqttProperties": true,
   "includeTransformFields": true
 },
   
    "authToken": "replaceWithAuthTokenFromCreateSession"
}
 
 

Retrieve the first 50 non-purged messages sent to topic1 with the message payload formatted as JSON

{
  "api": "mq",
  "action": "getMessagesFromMqTopic",
  "params":
  {
    "topic": "test/topic1",
    "startFrom": "head",
    "maxMessages": 50,
    "binaryFormat": "json"
  },
  "authToken": "replaceWithAuthTokenFromCreateSession"
}
 
 

Retrieve the next 50 non-purged messages sent to topic1

{
  "api": "mq",
  "action": "getMessagesFromMqTopic",
  "params":
  {
    "topic": "test/topic1",
    "startFrom": 51,
    "maxMessages": 50,
    "binaryFormat": "json"
  },
  "authToken": "replaceWithAuthTokenFromCreateSession"
}
 
 

Retrieve 5000 messages sent to topic1 on or after "2024-07-11T21:18:01.000"

{
  "api": "mq",
  "action": "getMessagesFromMqTopic",
  "params":
  {
    "topic": "test/topic1",
    "startFrom": "2024-07-11T21:18:01.000",
    "maxMessages": 5000,
    "binaryFormat": "json"
  },
  "authToken": "replaceWithAuthTokenFromCreateSession"
}
 
 

 

Response example

Success

{
  "authToken": "authToken",
  "result": {
    "messages": [
    ]
  },
  "requestId": "2",
}
 
 

 

Properties

Request properties ("params")

Property Description Default Type Limits (inclusive)

binaryFormat

The "binaryFormat" property designates the format of binary values embedded in JSON strings. For more details, see "binaryFormat" Optional with default of "hex" string One of the following: "base64", "hex", or "byteArray".

databaseName

The "databaseName" property specifies the database that contains an object, such as a table or code package. If it is set to null or is omitted, it defaults to the default database of the JSON Action session, see "createSession" and the "defaultDatabaseName" property. 

You specify this property when you want to use a different database instead of the default. 

This property is useful because objects, such as tables and code packages, can have the same name in multiple databases. This feature allows you to create multiple environments in the same server and reuse the same JSON actions in each environment. For example, you can create "dev", "test", "stage", and "prod" databases on the same server and use the "defaultDatabaseName" or "databaseName" properties to specify the desired environment.

It is an error to set "databaseName" to the empty string "".

If no default database is specified during "createSession", the server sets the "defaultDatabaseName" to the "defaultDatabaseName" value specified in the services.json file.

Defaults to the session's "defaultDatabaseName" property string 1 to 64 bytes

expiredMessagesFilter

The "expiredMessagesFilter" property indicates whether the response should include expired messages. 

  • "include" returns expired messages in the results.
  • "exclude" omits expired messages from the results.
  • "only" returns only expired messages in the results.
Optional with default of "include" string

"include"

"exclude"

"only"

includeExpiredMessages

The "includeExpiredMessages" property includes expired messages in a subscription when set to true. A message has expired when the "messageHasExpired" property in the response is true.

 

Optional with default of true Boolean

true

false

includeMessageMetadata

The "includeMessageMetadata" property includes the message's "id", "timestamp", "error", and "log" properties when set to true.

Optional with default of false Boolean

true

false

includeMqttProperties

The "includeMqttProperties" property includes the MQTT properties in the response when set to true. Otherwise, omit the property or set it to false or null. MQTT properties are helpful when you need to troubleshoot MQTT messages.

Optional with default of false Boolean

true

false

includeOtherFields

The "includeOtherFields" property includes additional fields that are not the built-in integration table fields, transform fields, and MQTT fields when set to true. Otherwise, omit the property or set it to false or null.

Optional with default of false Boolean

true

false

includeTransformFields

The "includeTransformFields" property includes custom transform fields (if any) that a customer added to the integration table to when set to true. Otherwise, omit the property or set it to false or null. Custom transform fields are helpful when you want to see the outputs and inputs of transformations assigned to the integration table.

Optional with default of false Boolean

true

false

maxMessages

The "maxMessages" property specifies the number of messages to return from a "getMessages…" action. The number of returned messages will be equal to or less than this maximum.

Optional with default of 20 integer 1 to 100000

ownerName

The "ownerName" property specifies the account that owns an object, such as a table or code package. See "createSession" and the "defaultOwnerName" property for more details. 

You specify this property when you want to use a different account instead of the default. Your session's account must have the appropriate privileges to access the code package. 

This property is useful because objects, such as tables and code packages, can have the same name in the same database as long as different accounts own each object. This feature allows you to create duplicate objects for different users on the same server and reuse the same JSON actions on those objects. For example, an administrator can copy objects from a production environment to her account so she can troubleshoot an issue using the same JSON actions, JavaScript, and SQL code.

It is an error to set "ownerName" to the empty string "".

If no default owner is specified during "createSession", the server sets the "defaultOwnerName" to the "defaultOwnerName" value specified in the services.json file.

Optional with default of the session's "defaultOwnerName" property string 1 to 64 bytes

startFrom

The "startFrom" property sets the cursor to a known position.

 

In "getRecordsByTable" and "getRecordsFromCursor" only, when "startFrom": "id" or "startFrom": "bookmark", the user will receive records starting from the value specified by the "id" or "bookmark" property. 

Optional with default of "currentPosition" string

"currentPosition"

"beforeFirstRecord"

"afterLastRecord"

"id"

"bookmark"

tableName

The "tableName" property contains the unique, user-defined name of a table.

See table name in System limits for the table naming requirements and limitations.

 

"params": {
  "tableName": "ctreeTable"
}
Required - No default value string 1 to 64 bytes

topic

The "topic" property specifies the unique, user-defined name for the topic.

  • The "topic" name is typically less than 150 characters.
  • The "topic" name will be used in all subsequent references to the topic, such as when re-configuring or deleting the topic.

 

Optional with default of "" string 1 to 65,500 bytes

 

Response properties ("result")

Property Description Type Limits (inclusive)

messages

The "messages" property specifies the message to be published to MQ as well as information about the message.

array of objects

1 or more objects including some or all of the following properties:

"binaryFormat"

"contentType"

"correlationData"

"expirySeconds"

"payload"

"responseTopic"

"retain"

"userProperties"