From 9ff8b3390fb5175cf00cbdfac3f4055dbc6f4e30 Mon Sep 17 00:00:00 2001 From: Hui Lan Date: Mon, 11 Aug 2025 13:38:35 +0800 Subject: Why the message cannot be received? --- Code/download_and_map.py | 3 ++- Code/test_redis_publish2.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 Code/test_redis_publish2.py diff --git a/Code/download_and_map.py b/Code/download_and_map.py index 6372248..0e8f160 100644 --- a/Code/download_and_map.py +++ b/Code/download_and_map.py @@ -391,15 +391,16 @@ def publish(mapped_data_directory): redis_host = os.getenv('REDIS_HOST', '127.0.0.1') redis_password = os.getenv('REDIS_PASSWORD', '123456') r = redis.Redis(host=redis_host, port=6379, password=redis_password, db=0) + r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_START.txt', 'data':str(datetime.now())})) for fname in glob.glob('%s/*_quant.txt' % (mapped_data_directory.rstrip('/'))): try: file_basename = os.path.basename(fname) with open(fname, 'r') as f: data = f.read() r.publish(REDIS_CHANNEL, json.dumps({'filename':file_basename, 'data':data})) - time.sleep(1) except Exception as e: r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_ERROR.txt', 'data':str(e)})) + time.sleep(1) ## main diff --git a/Code/test_redis_publish2.py b/Code/test_redis_publish2.py new file mode 100644 index 0000000..eeda27d --- /dev/null +++ b/Code/test_redis_publish2.py @@ -0,0 +1,32 @@ +import redis +from configure import DAILY_MAP_NUMBER, MIN_FASTQ_FILE_SIZE, RNA_SEQ_INFO_FILE, DOWNLOADED_SRA_ID_LOG_FILE, IGNORED_SRA_ID_LOG_FILE, UPDATE_NETWORK_LOG_FILE, MAPPED_RDATA_DIR, RAW_RDATA_DIR, SALMON_MAP_RESULT_DIR, REDIS_CHANNEL +import json, glob, os, time +from datetime import datetime +def publish(mapped_data_directory): + redis_host = os.getenv('REDIS_HOST', '127.0.0.1') + #redis_host = os.getenv('REDIS_HOST', '0.0.0.0') + redis_password = os.getenv('REDIS_PASSWORD', '123456') + print(redis_password) + r = redis.Redis(host=redis_host, port=6379, password=redis_password, db=0) + r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_START.txt', 'data':str(datetime.now())})) + for fname in glob.glob('%s/*_quant.txt' % (mapped_data_directory.rstrip('/'))): + print(fname) + try: + file_basename = os.path.basename(fname) + with open(fname, 'r') as f: + data = f.read() + r.publish(REDIS_CHANNEL, json.dumps({'filename':file_basename, 'data':data})) + time.sleep(3) + except Exception as e: + print(f'ERROR {e}') + r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_ERROR.txt', 'data':str(e)})) + + +# after mapping is finished, move all resulting files to MAPPED_RDATA_DIR +if glob.glob('%s/*_quant.txt' % (SALMON_MAP_RESULT_DIR.rstrip('/'))) != []: + publish(SALMON_MAP_RESULT_DIR) + #cmd = 'mv %s/*_quant.txt %s' % (SALMON_MAP_RESULT_DIR.rstrip('/'), MAPPED_RDATA_DIR) + #os.system(cmd) + print('[download_and_map.py] Done. Check directory %s.' % (os.path.abspath(MAPPED_RDATA_DIR))) +else: + print('[download_and_map.py] No quant files to move.') -- cgit v1.2.1 From 9a4a23c89cdba6a8eebe4ee13cd55e6d3aed5b81 Mon Sep 17 00:00:00 2001 From: Hui Lan Date: Tue, 19 Aug 2025 14:25:07 +0800 Subject: Running the publisher as a seperate script seems solved the problem of not being able to receive messages. --- Code/download_and_map.py | 20 +++++++++++++++----- Code/publish_mapped_data.py | 33 +++++++++++++++++++++++++++++++++ Code/test_redis_publish.py | 4 ++-- Code/test_redis_publish2.py | 32 -------------------------------- 4 files changed, 50 insertions(+), 39 deletions(-) create mode 100644 Code/publish_mapped_data.py delete mode 100644 Code/test_redis_publish2.py diff --git a/Code/download_and_map.py b/Code/download_and_map.py index 0e8f160..2a0a157 100644 --- a/Code/download_and_map.py +++ b/Code/download_and_map.py @@ -392,6 +392,7 @@ def publish(mapped_data_directory): redis_password = os.getenv('REDIS_PASSWORD', '123456') r = redis.Redis(host=redis_host, port=6379, password=redis_password, db=0) r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_START.txt', 'data':str(datetime.now())})) + time.sleep(3) for fname in glob.glob('%s/*_quant.txt' % (mapped_data_directory.rstrip('/'))): try: file_basename = os.path.basename(fname) @@ -400,7 +401,7 @@ def publish(mapped_data_directory): r.publish(REDIS_CHANNEL, json.dumps({'filename':file_basename, 'data':data})) except Exception as e: r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_ERROR.txt', 'data':str(e)})) - time.sleep(1) + time.sleep(3) ## main @@ -421,6 +422,8 @@ if not last_session_finished(DOWNLOADED_SRA_ID_LOG_FILE): # last session not fin write_network_log_file(s, UPDATE_NETWORK_LOG_FILE) if age_of_file_in_hours(DOWNLOADED_SRA_ID_LOG_FILE) > 24: # add DONE to the log file anyway append_done(DOWNLOADED_SRA_ID_LOG_FILE) + download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, 'DONE at %s\n' % (curr_time)) + reverse_lines(DOWNLOADED_SRA_ID_LOG_FILE, os.path.splitext(DOWNLOADED_SRA_ID_LOG_FILE)[0] + '_reversed.txt') kill_process('wget') sys.exit() @@ -450,14 +453,21 @@ if not os.path.isdir(MAPPED_RDATA_DIR): # after mapping is finished, move all resulting files to MAPPED_RDATA_DIR if glob.glob('%s/*_quant.txt' % (SALMON_MAP_RESULT_DIR.rstrip('/'))) != []: - publish(SALMON_MAP_RESULT_DIR) - cmd = 'mv %s/*_quant.txt %s' % (SALMON_MAP_RESULT_DIR.rstrip('/'), MAPPED_RDATA_DIR) + + try: + write_log_file('[download_and_map.py] Ready to publish quant files.', UPDATE_NETWORK_LOG_FILE) + publish(SALMON_MAP_RESULT_DIR) + time.sleep(5) + except Exception as e: + write_log_file('[download_and_map.py] Publish error %s.' % (str(e)), UPDATE_NETWORK_LOG_FILE) + + cmd = 'cp %s/*_quant.txt %s' % (SALMON_MAP_RESULT_DIR.rstrip('/'), MAPPED_RDATA_DIR) os.system(cmd) print('[download_and_map.py] Done. Check directory %s.' % (os.path.abspath(MAPPED_RDATA_DIR))) else: print('[download_and_map.py] No quant files to move.') - + write_log_file('[download_and_map.py] No quant files to move.', UPDATE_NETWORK_LOG_FILE) write_download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, '%s\n' % ('\n'.join(map_list))) -write_download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, 'DONE at %s\n' % (curr_time)) +download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, 'DONE at %s\n' % (curr_time)) reverse_lines(DOWNLOADED_SRA_ID_LOG_FILE, os.path.splitext(DOWNLOADED_SRA_ID_LOG_FILE)[0] + '_reversed.txt') diff --git a/Code/publish_mapped_data.py b/Code/publish_mapped_data.py new file mode 100644 index 0000000..2bd655c --- /dev/null +++ b/Code/publish_mapped_data.py @@ -0,0 +1,33 @@ +import redis +from configure import DAILY_MAP_NUMBER, MIN_FASTQ_FILE_SIZE, RNA_SEQ_INFO_FILE, DOWNLOADED_SRA_ID_LOG_FILE, IGNORED_SRA_ID_LOG_FILE, UPDATE_NETWORK_LOG_FILE, MAPPED_RDATA_DIR, RAW_RDATA_DIR, SALMON_MAP_RESULT_DIR, REDIS_CHANNEL +import json, glob, os, time +from datetime import datetime +def publish(mapped_files): + redis_host = os.getenv('REDIS_HOST', '127.0.0.1') + redis_password = os.getenv('REDIS_PASSWORD', '123456') + print(redis_password) + r = redis.Redis(host=redis_host, port=6379, password=redis_password, db=0) + r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_START.txt', 'data':str(datetime.now())})) + for fname in mapped_files: + print(fname) + try: + file_basename = os.path.basename(fname) + with open(fname, 'r') as f: + data = f.read() + r.publish(REDIS_CHANNEL, json.dumps({'filename':file_basename, 'data':data})) + time.sleep(3) + except Exception as e: + print(f'ERROR {e}') + r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_ERROR.txt', 'data':str(e)})) + + +# after mapping is finished, move all resulting files to MAPPED_RDATA_DIR +mapped_files = glob.glob('%s/*_quant.txt' % (SALMON_MAP_RESULT_DIR.rstrip('/'))) +print(mapped_files) +publish(mapped_files) +if mapped_files != []: + cmd = 'mv %s/*_quant.txt %s' % (SALMON_MAP_RESULT_DIR.rstrip('/'), MAPPED_RDATA_DIR) + os.system(cmd) + print('[download_and_map.py] Done. Check directory %s.' % (os.path.abspath(MAPPED_RDATA_DIR))) +else: + print('[download_and_map.py] No quant files to move.') diff --git a/Code/test_redis_publish.py b/Code/test_redis_publish.py index 937833f..722e80e 100644 --- a/Code/test_redis_publish.py +++ b/Code/test_redis_publish.py @@ -8,7 +8,7 @@ def publish(number): r.publish(REDIS_CHANNEL, json.dumps({'filename':'a.txt', 'data':str(number)})) n = 0 -while True: +while n < 10: publish(n) - time.sleep(3) + time.sleep(2) n += 1 diff --git a/Code/test_redis_publish2.py b/Code/test_redis_publish2.py deleted file mode 100644 index eeda27d..0000000 --- a/Code/test_redis_publish2.py +++ /dev/null @@ -1,32 +0,0 @@ -import redis -from configure import DAILY_MAP_NUMBER, MIN_FASTQ_FILE_SIZE, RNA_SEQ_INFO_FILE, DOWNLOADED_SRA_ID_LOG_FILE, IGNORED_SRA_ID_LOG_FILE, UPDATE_NETWORK_LOG_FILE, MAPPED_RDATA_DIR, RAW_RDATA_DIR, SALMON_MAP_RESULT_DIR, REDIS_CHANNEL -import json, glob, os, time -from datetime import datetime -def publish(mapped_data_directory): - redis_host = os.getenv('REDIS_HOST', '127.0.0.1') - #redis_host = os.getenv('REDIS_HOST', '0.0.0.0') - redis_password = os.getenv('REDIS_PASSWORD', '123456') - print(redis_password) - r = redis.Redis(host=redis_host, port=6379, password=redis_password, db=0) - r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_START.txt', 'data':str(datetime.now())})) - for fname in glob.glob('%s/*_quant.txt' % (mapped_data_directory.rstrip('/'))): - print(fname) - try: - file_basename = os.path.basename(fname) - with open(fname, 'r') as f: - data = f.read() - r.publish(REDIS_CHANNEL, json.dumps({'filename':file_basename, 'data':data})) - time.sleep(3) - except Exception as e: - print(f'ERROR {e}') - r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_ERROR.txt', 'data':str(e)})) - - -# after mapping is finished, move all resulting files to MAPPED_RDATA_DIR -if glob.glob('%s/*_quant.txt' % (SALMON_MAP_RESULT_DIR.rstrip('/'))) != []: - publish(SALMON_MAP_RESULT_DIR) - #cmd = 'mv %s/*_quant.txt %s' % (SALMON_MAP_RESULT_DIR.rstrip('/'), MAPPED_RDATA_DIR) - #os.system(cmd) - print('[download_and_map.py] Done. Check directory %s.' % (os.path.abspath(MAPPED_RDATA_DIR))) -else: - print('[download_and_map.py] No quant files to move.') -- cgit v1.2.1 From 119738ccd59c5f5b3df415e6caf330f12aecdd40 Mon Sep 17 00:00:00 2001 From: Hui Lan Date: Tue, 26 Aug 2025 15:30:15 +0800 Subject: Use the correct function write log file --- Code/download_and_map.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Code/download_and_map.py b/Code/download_and_map.py index 2a0a157..dd063e1 100644 --- a/Code/download_and_map.py +++ b/Code/download_and_map.py @@ -455,18 +455,18 @@ if not os.path.isdir(MAPPED_RDATA_DIR): if glob.glob('%s/*_quant.txt' % (SALMON_MAP_RESULT_DIR.rstrip('/'))) != []: try: - write_log_file('[download_and_map.py] Ready to publish quant files.', UPDATE_NETWORK_LOG_FILE) + write_network_log_file('[download_and_map.py] Ready to publish quant files.', UPDATE_NETWORK_LOG_FILE) publish(SALMON_MAP_RESULT_DIR) time.sleep(5) except Exception as e: - write_log_file('[download_and_map.py] Publish error %s.' % (str(e)), UPDATE_NETWORK_LOG_FILE) + write_network_log_file('[download_and_map.py] Publish error %s.' % (str(e)), UPDATE_NETWORK_LOG_FILE) cmd = 'cp %s/*_quant.txt %s' % (SALMON_MAP_RESULT_DIR.rstrip('/'), MAPPED_RDATA_DIR) os.system(cmd) print('[download_and_map.py] Done. Check directory %s.' % (os.path.abspath(MAPPED_RDATA_DIR))) else: print('[download_and_map.py] No quant files to move.') - write_log_file('[download_and_map.py] No quant files to move.', UPDATE_NETWORK_LOG_FILE) + write_network_log_file('[download_and_map.py] No quant files to move.', UPDATE_NETWORK_LOG_FILE) write_download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, '%s\n' % ('\n'.join(map_list))) download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, 'DONE at %s\n' % (curr_time)) -- cgit v1.2.1 From ff6bbd1e94851e727e5e7166919aa22fffec6f9b Mon Sep 17 00:00:00 2001 From: Hui Lan Date: Tue, 26 Aug 2025 15:33:58 +0800 Subject: Remove the publish quant data stuff from download_and_map.py. Publish using publish_mapped_data.py. --- Code/download_and_map.py | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/Code/download_and_map.py b/Code/download_and_map.py index dd063e1..f7e7d94 100644 --- a/Code/download_and_map.py +++ b/Code/download_and_map.py @@ -19,12 +19,11 @@ import fnmatch import time import re import shutil -import redis import json from datetime import datetime ########################################################################################## -from configure import DAILY_MAP_NUMBER, MIN_FASTQ_FILE_SIZE, RNA_SEQ_INFO_FILE, DOWNLOADED_SRA_ID_LOG_FILE, IGNORED_SRA_ID_LOG_FILE, UPDATE_NETWORK_LOG_FILE, MAPPED_RDATA_DIR, RAW_RDATA_DIR, SALMON_MAP_RESULT_DIR, REDIS_CHANNEL +from configure import DAILY_MAP_NUMBER, MIN_FASTQ_FILE_SIZE, RNA_SEQ_INFO_FILE, DOWNLOADED_SRA_ID_LOG_FILE, IGNORED_SRA_ID_LOG_FILE, UPDATE_NETWORK_LOG_FILE, MAPPED_RDATA_DIR, RAW_RDATA_DIR, SALMON_MAP_RESULT_DIR ########################################################################################## def glob_files(directory, pattern): @@ -387,22 +386,6 @@ def reverse_lines(fname, fname2): f2.write(content) -def publish(mapped_data_directory): - redis_host = os.getenv('REDIS_HOST', '127.0.0.1') - redis_password = os.getenv('REDIS_PASSWORD', '123456') - r = redis.Redis(host=redis_host, port=6379, password=redis_password, db=0) - r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_START.txt', 'data':str(datetime.now())})) - time.sleep(3) - for fname in glob.glob('%s/*_quant.txt' % (mapped_data_directory.rstrip('/'))): - try: - file_basename = os.path.basename(fname) - with open(fname, 'r') as f: - data = f.read() - r.publish(REDIS_CHANNEL, json.dumps({'filename':file_basename, 'data':data})) - except Exception as e: - r.publish(REDIS_CHANNEL, json.dumps({'filename':'REDIS_ERROR.txt', 'data':str(e)})) - time.sleep(3) - ## main # For filtering RNA-seq data @@ -451,16 +434,8 @@ curr_time = datetime.now().strftime('%Y-%m-%d_%H%M') # append date info to newly if not os.path.isdir(MAPPED_RDATA_DIR): os.makedirs(MAPPED_RDATA_DIR) -# after mapping is finished, move all resulting files to MAPPED_RDATA_DIR +# after mapping is finished, copy all resulting files to MAPPED_RDATA_DIR if glob.glob('%s/*_quant.txt' % (SALMON_MAP_RESULT_DIR.rstrip('/'))) != []: - - try: - write_network_log_file('[download_and_map.py] Ready to publish quant files.', UPDATE_NETWORK_LOG_FILE) - publish(SALMON_MAP_RESULT_DIR) - time.sleep(5) - except Exception as e: - write_network_log_file('[download_and_map.py] Publish error %s.' % (str(e)), UPDATE_NETWORK_LOG_FILE) - cmd = 'cp %s/*_quant.txt %s' % (SALMON_MAP_RESULT_DIR.rstrip('/'), MAPPED_RDATA_DIR) os.system(cmd) print('[download_and_map.py] Done. Check directory %s.' % (os.path.abspath(MAPPED_RDATA_DIR))) -- cgit v1.2.1 From 7447e7d86240ef6e61574d53e819dc33ceab527e Mon Sep 17 00:00:00 2001 From: Hui Lan Date: Wed, 27 Aug 2025 18:52:43 +0800 Subject: Fix 'function download_log_file not found' error --- Code/download_and_map.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Code/download_and_map.py b/Code/download_and_map.py index f7e7d94..8c5c4a7 100644 --- a/Code/download_and_map.py +++ b/Code/download_and_map.py @@ -405,7 +405,7 @@ if not last_session_finished(DOWNLOADED_SRA_ID_LOG_FILE): # last session not fin write_network_log_file(s, UPDATE_NETWORK_LOG_FILE) if age_of_file_in_hours(DOWNLOADED_SRA_ID_LOG_FILE) > 24: # add DONE to the log file anyway append_done(DOWNLOADED_SRA_ID_LOG_FILE) - download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, 'DONE at %s\n' % (curr_time)) + write_download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, 'DONE at %s\n' % (curr_time)) reverse_lines(DOWNLOADED_SRA_ID_LOG_FILE, os.path.splitext(DOWNLOADED_SRA_ID_LOG_FILE)[0] + '_reversed.txt') kill_process('wget') sys.exit() @@ -444,5 +444,5 @@ else: write_network_log_file('[download_and_map.py] No quant files to move.', UPDATE_NETWORK_LOG_FILE) write_download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, '%s\n' % ('\n'.join(map_list))) -download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, 'DONE at %s\n' % (curr_time)) +write_download_log_file(DOWNLOADED_SRA_ID_LOG_FILE, 'DONE at %s\n' % (curr_time)) reverse_lines(DOWNLOADED_SRA_ID_LOG_FILE, os.path.splitext(DOWNLOADED_SRA_ID_LOG_FILE)[0] + '_reversed.txt') -- cgit v1.2.1