Unable to clear Ready to Process queue processor records
Pega 8.8.2. Our stream node will not join our cluster which is preventing millions of queue records from being processed, which is causing issues with loading our interaction portal due to the data page executed in the OOTB rule SystemHealthOverview1. We have disable this step for now but we still need to resolve the queue processor issue. Every time we restart the cluster this one goes down immediately with error:
[0.001s][error][logging] Error opening log file '/usr/share/tomcat/kafka- Proprietary information hidden/logs/kafkaServer-gc.log': No space left on device
We would like to remove all Ready to Process items in our custom queue processors so that the node does not immediately run out of memory, but using the script found in other articles is not working for this version of Pega.
The below java is returning the error:
The method withIdempotentSave(boolean) is undefined for the type StreamDataSetBuilder
Pega 8.8.2. Our stream node will not join our cluster which is preventing millions of queue records from being processed, which is causing issues with loading our interaction portal due to the data page executed in the OOTB rule SystemHealthOverview1. We have disable this step for now but we still need to resolve the queue processor issue. Every time we restart the cluster this one goes down immediately with error:
[0.001s][error][logging] Error opening log file '/usr/share/tomcat/kafka- Proprietary information hidden/logs/kafkaServer-gc.log': No space left on device
We would like to remove all Ready to Process items in our custom queue processors so that the node does not immediately run out of memory, but using the script found in other articles is not working for this version of Pega.
The below java is returning the error:
The method withIdempotentSave(boolean) is undefined for the type StreamDataSetBuilder
java.util.ArrayList<String> partitionKeys = new java.util.ArrayList<>();
partitionKeys.add(".pzMessageContent.pzInsKey");
com.pega.dsm.dnode.api.dataset.DataSet dataset = com.pega.dsm.dnode.api.dataset.stream.StreamDataSet.builder()
.withClassName("System-Message-QueueProcessor")
.withTopicName("RESOLVEAGEDINBOUNDWLCASE")
.withPartitionKeys(partitionKeys)
.withIdempotentSave(true)
.withOperation(com.pega.dsm.dnode.api.dataset.operation.SaveOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaGetTopicsOperation.NAME)
.withOperation(com.pega.dsm.dnode.api.dataset.operation.BrowseOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaDropOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME)
.withSerde(com.pega.dsm.kafka.api.serde.ClipboardPageStreamSerde.create())
.build(tools);
com.pega.dsm.dnode.api.dataset.operation.Operation operation = dataset.getOperationByName(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME);
((com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation)operation).truncate().await(tools);
I have also tried importing the DELAYEDITEMSDATAFLOWSERVICE data instance from a working environment to see if that was the issue but did not have any impact.
Can you suggest how to resolve this error or issue overall?