From ffa66a6971010057a5918ddc54531bec7bf18842 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 30 Jan 2025 15:58:20 -0800 Subject: [PATCH 1/9] fix speed of --patch-from at high compression mode --- lib/compress/zstd_compress.c | 69 ++++++++++++++++++---------------- lib/compress/zstdmt_compress.c | 4 +- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index b755c6bc223..bd7667d4a96 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3117,7 +3117,7 @@ ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_Para ZSTD_STATIC_ASSERT((unsigned)ZSTD_fast == 1); assert(ZSTD_cParam_withinBounds(ZSTD_c_strategy, (int)strat)); - DEBUGLOG(4, "Selected block compressor: dictMode=%d strat=%d rowMatchfinder=%d", (int)dictMode, (int)strat, (int)useRowMatchFinder); + DEBUGLOG(5, "Selected block compressor: dictMode=%d strat=%d rowMatchfinder=%d", (int)dictMode, (int)strat, (int)useRowMatchFinder); if (ZSTD_rowMatchFinderUsed(strat, useRowMatchFinder)) { static const ZSTD_BlockCompressor_f rowBasedBlockCompressors[4][3] = { { @@ -3141,7 +3141,7 @@ ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_Para ZSTD_COMPRESSBLOCK_LAZY2_DEDICATEDDICTSEARCH_ROW } }; - DEBUGLOG(4, "Selecting a row-based matchfinder"); + DEBUGLOG(5, "Selecting a row-based matchfinder"); assert(useRowMatchFinder != ZSTD_ps_auto); selectedCompressor = rowBasedBlockCompressors[(int)dictMode][(int)strat - (int)ZSTD_greedy]; } else { @@ -3619,7 +3619,7 @@ writeBlockHeader(void* op, size_t cSize, size_t blockSize, U32 lastBlock) lastBlock + (((U32)bt_rle)<<1) + (U32)(blockSize << 3) : lastBlock + (((U32)bt_compressed)<<1) + (U32)(cSize << 3); MEM_writeLE24(op, cBlockHeader); - DEBUGLOG(3, "writeBlockHeader: cSize: %zu blockSize: %zu lastBlock: %u", cSize, blockSize, lastBlock); + DEBUGLOG(5, "writeBlockHeader: cSize: %zu blockSize: %zu lastBlock: %u", cSize, blockSize, lastBlock); } /** ZSTD_buildBlockEntropyStats_literals() : @@ -4151,18 +4151,18 @@ ZSTD_compressSeqStore_singleBlock(ZSTD_CCtx* zc, if (cSeqsSize == 0) { cSize = ZSTD_noCompressBlock(op, dstCapacity, ip, srcSize, lastBlock); FORWARD_IF_ERROR(cSize, "Nocompress block failed"); - DEBUGLOG(4, "Writing out nocompress block, size: %zu", cSize); + DEBUGLOG(5, "Writing out nocompress block, size: %zu", cSize); *dRep = dRepOriginal; /* reset simulated decompression repcode history */ } else if (cSeqsSize == 1) { cSize = ZSTD_rleCompressBlock(op, dstCapacity, *ip, srcSize, lastBlock); FORWARD_IF_ERROR(cSize, "RLE compress block failed"); - DEBUGLOG(4, "Writing out RLE block, size: %zu", cSize); + DEBUGLOG(5, "Writing out RLE block, size: %zu", cSize); *dRep = dRepOriginal; /* reset simulated decompression repcode history */ } else { ZSTD_blockState_confirmRepcodesAndEntropyTables(&zc->blockState); writeBlockHeader(op, cSeqsSize, srcSize, lastBlock); cSize = ZSTD_blockHeaderSize + cSeqsSize; - DEBUGLOG(4, "Writing out compressed block, size: %zu", cSize); + DEBUGLOG(5, "Writing out compressed block, size: %zu", cSize); } if (zc->blockState.prevCBlock->entropy.fse.offcode_repeatMode == FSE_repeat_valid) @@ -4357,7 +4357,7 @@ ZSTD_compressBlock_splitBlock(ZSTD_CCtx* zc, { U32 nbSeq; size_t cSize; - DEBUGLOG(4, "ZSTD_compressBlock_splitBlock"); + DEBUGLOG(5, "ZSTD_compressBlock_splitBlock"); assert(zc->appliedParams.postBlockSplitter == ZSTD_ps_enable); { const size_t bss = ZSTD_buildSeqStore(zc, src, srcSize); @@ -4368,7 +4368,7 @@ ZSTD_compressBlock_splitBlock(ZSTD_CCtx* zc, RETURN_ERROR_IF(zc->seqCollector.collectSequences, sequenceProducer_failed, "Uncompressible block"); cSize = ZSTD_noCompressBlock(dst, dstCapacity, src, srcSize, lastBlock); FORWARD_IF_ERROR(cSize, "ZSTD_noCompressBlock failed"); - DEBUGLOG(4, "ZSTD_compressBlock_splitBlock: Nocompress block"); + DEBUGLOG(5, "ZSTD_compressBlock_splitBlock: Nocompress block"); return cSize; } nbSeq = (U32)(zc->seqStore.sequences - zc->seqStore.sequencesStart); @@ -4603,7 +4603,7 @@ static size_t ZSTD_compress_frameChunk(ZSTD_CCtx* cctx, assert(cctx->appliedParams.cParams.windowLog <= ZSTD_WINDOWLOG_MAX); - DEBUGLOG(4, "ZSTD_compress_frameChunk (blockSizeMax=%u)", (unsigned)blockSizeMax); + DEBUGLOG(5, "ZSTD_compress_frameChunk (srcSize=%u, blockSizeMax=%u)", (unsigned)srcSize, (unsigned)blockSizeMax); if (cctx->appliedParams.fParams.checksumFlag && srcSize) XXH64_update(&cctx->xxhState, src, srcSize); @@ -4898,13 +4898,14 @@ size_t ZSTD_compressBlock(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const /*! ZSTD_loadDictionaryContent() : * @return : 0, or an error code */ -static size_t ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms, - ldmState_t* ls, - ZSTD_cwksp* ws, - ZSTD_CCtx_params const* params, - const void* src, size_t srcSize, - ZSTD_dictTableLoadMethod_e dtlm, - ZSTD_tableFillPurpose_e tfp) +static size_t +ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms, + ldmState_t* ls, + ZSTD_cwksp* ws, + ZSTD_CCtx_params const* params, + const void* src, size_t srcSize, + ZSTD_dictTableLoadMethod_e dtlm, + ZSTD_tableFillPurpose_e tfp) { const BYTE* ip = (const BYTE*) src; const BYTE* const iend = ip + srcSize; @@ -4948,17 +4949,18 @@ static size_t ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms, } ZSTD_window_update(&ms->window, src, srcSize, /* forceNonContiguous */ 0); - DEBUGLOG(4, "ZSTD_loadDictionaryContent(): useRowMatchFinder=%d", (int)params->useRowMatchFinder); + DEBUGLOG(4, "ZSTD_loadDictionaryContent: useRowMatchFinder=%d", (int)params->useRowMatchFinder); if (loadLdmDict) { /* Load the entire dict into LDM matchfinders. */ + DEBUGLOG(4, "ZSTD_loadDictionaryContent: Trigger loadLdmDict"); ZSTD_window_update(&ls->window, src, srcSize, /* forceNonContiguous */ 0); ls->loadedDictEnd = params->forceWindow ? 0 : (U32)(iend - ls->window.base); ZSTD_ldm_fillHashTable(ls, ip, iend, ¶ms->ldmParams); + DEBUGLOG(4, "ZSTD_loadDictionaryContent: ZSTD_ldm_fillHashTable completes"); } /* If the dict is larger than we can reasonably index in our tables, only load the suffix. */ - if (params->cParams.strategy < ZSTD_btultra) { - U32 maxDictSize = 8U << MIN(MAX(params->cParams.hashLog, params->cParams.chainLog), 28); + { U32 maxDictSize = 2U << MIN(MAX(params->cParams.hashLog, params->cParams.chainLog), 30); if (srcSize > maxDictSize) { ip = iend - maxDictSize; src = ip; @@ -5022,6 +5024,7 @@ static size_t ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms, || !defined(ZSTD_EXCLUDE_BTOPT_BLOCK_COMPRESSOR) \ || !defined(ZSTD_EXCLUDE_BTULTRA_BLOCK_COMPRESSOR) assert(srcSize >= HASH_READ_SIZE); + DEBUGLOG(4, "Fill %u bytes into the Binary Tree", (unsigned)srcSize); ZSTD_updateTree(ms, iend-HASH_READ_SIZE, iend); #else assert(0); /* shouldn't be called: cparams should've been adjusted. */ @@ -5598,14 +5601,16 @@ static size_t ZSTD_initCDict_internal( return 0; } -static ZSTD_CDict* ZSTD_createCDict_advanced_internal(size_t dictSize, - ZSTD_dictLoadMethod_e dictLoadMethod, - ZSTD_compressionParameters cParams, - ZSTD_ParamSwitch_e useRowMatchFinder, - int enableDedicatedDictSearch, - ZSTD_customMem customMem) +static ZSTD_CDict* +ZSTD_createCDict_advanced_internal(size_t dictSize, + ZSTD_dictLoadMethod_e dictLoadMethod, + ZSTD_compressionParameters cParams, + ZSTD_ParamSwitch_e useRowMatchFinder, + int enableDedicatedDictSearch, + ZSTD_customMem customMem) { if ((!customMem.customAlloc) ^ (!customMem.customFree)) return NULL; + DEBUGLOG(3, "ZSTD_createCDict_advanced_internal (dictSize=%u)", (unsigned)dictSize); { size_t const workspaceSize = ZSTD_cwksp_alloc_size(sizeof(ZSTD_CDict)) + @@ -5642,6 +5647,7 @@ ZSTD_CDict* ZSTD_createCDict_advanced(const void* dictBuffer, size_t dictSize, { ZSTD_CCtx_params cctxParams; ZSTD_memset(&cctxParams, 0, sizeof(cctxParams)); + DEBUGLOG(3, "ZSTD_createCDict_advanced, dictSize=%u, mode=%u", (unsigned)dictSize, (unsigned)dictContentType); ZSTD_CCtxParams_init(&cctxParams, 0); cctxParams.cParams = cParams; cctxParams.customMem = customMem; @@ -5662,7 +5668,7 @@ ZSTD_CDict* ZSTD_createCDict_advanced2( ZSTD_compressionParameters cParams; ZSTD_CDict* cdict; - DEBUGLOG(3, "ZSTD_createCDict_advanced2, mode %u", (unsigned)dictContentType); + DEBUGLOG(3, "ZSTD_createCDict_advanced2, dictSize=%u, mode=%u", (unsigned)dictSize, (unsigned)dictContentType); if (!customMem.customAlloc ^ !customMem.customFree) return NULL; if (cctxParams.enableDedicatedDictSearch) { @@ -5681,7 +5687,7 @@ ZSTD_CDict* ZSTD_createCDict_advanced2( &cctxParams, ZSTD_CONTENTSIZE_UNKNOWN, dictSize, ZSTD_cpm_createCDict); } - DEBUGLOG(3, "ZSTD_createCDict_advanced2: DDS: %u", cctxParams.enableDedicatedDictSearch); + DEBUGLOG(3, "ZSTD_createCDict_advanced2: DedicatedDictSearch=%u", cctxParams.enableDedicatedDictSearch); cctxParams.cParams = cParams; cctxParams.useRowMatchFinder = ZSTD_resolveRowMatchFinderMode(cctxParams.useRowMatchFinder, &cParams); @@ -5767,6 +5773,7 @@ const ZSTD_CDict* ZSTD_initStaticCDict( ZSTD_CDict* cdict; ZSTD_CCtx_params params; + DEBUGLOG(4, "ZSTD_initStaticCDict (dictSize==%u)", (unsigned)dictSize); if ((size_t)workspace & 7) return NULL; /* 8-aligned */ { @@ -5777,8 +5784,6 @@ const ZSTD_CDict* ZSTD_initStaticCDict( ZSTD_cwksp_move(&cdict->workspace, &ws); } - DEBUGLOG(4, "(workspaceSize < neededSize) : (%u < %u) => %u", - (unsigned)workspaceSize, (unsigned)neededSize, (unsigned)(workspaceSize < neededSize)); if (workspaceSize < neededSize) return NULL; ZSTD_CCtxParams_init(¶ms, 0); @@ -6357,7 +6362,7 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, */ params.compressionLevel = cctx->cdict->compressionLevel; } - DEBUGLOG(4, "ZSTD_compressStream2 : transparent init stage"); + DEBUGLOG(4, "ZSTD_CCtx_init_compressStream2 : transparent init stage"); if (endOp == ZSTD_e_end) cctx->pledgedSrcSizePlusOne = inSize + 1; /* auto-determine pledgedSrcSize */ { size_t const dictSize = prefixDict.dict @@ -6388,9 +6393,9 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, params.nbWorkers = 0; /* do not invoke multi-threading when src size is too small */ } if (params.nbWorkers > 0) { -#if ZSTD_TRACE +# if ZSTD_TRACE cctx->traceCtx = (ZSTD_trace_compress_begin != NULL) ? ZSTD_trace_compress_begin(cctx) : 0; -#endif +# endif /* mt context creation */ if (cctx->mtctx == NULL) { DEBUGLOG(4, "ZSTD_compressStream2: creating new mtctx for nbWorkers=%u", diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index e8fef5144e1..86ca0531555 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1245,13 +1245,11 @@ size_t ZSTDMT_initCStream_internal( /* init */ if (params.nbWorkers != mtctx->params.nbWorkers) - FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) , ""); + FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, (unsigned)params.nbWorkers) , ""); if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN; if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX; - DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers); - if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ ZSTDMT_waitForAllJobsCompleted(mtctx); ZSTDMT_releaseAllJobResources(mtctx); From 34ba14437aeeb5e678ae7f1dbdfa6333beb2723b Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 30 Jan 2025 18:05:58 -0800 Subject: [PATCH 2/9] minor boundary change improves compression ratio at low levels --- Makefile | 6 +++--- lib/compress/zstd_compress.c | 2 +- tests/regression/results.csv | 18 +++++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Makefile b/Makefile index 03ee6fe8928..e8e6bcffe4e 100644 --- a/Makefile +++ b/Makefile @@ -296,9 +296,9 @@ msanregressiontest: update_regressionResults : REGRESS_RESULTS_DIR := /tmp/regress_results_dir/ update_regressionResults: - $(MAKE) -C programs zstd - $(MAKE) -C tests/regression test - $(RM) -rf $(REGRESS_RESULTS_DIR) + $(MAKE) -j -C programs zstd + $(MAKE) -j -C tests/regression test + $(RM) -r $(REGRESS_RESULTS_DIR) $(MKDIR) $(REGRESS_RESULTS_DIR) ./tests/regression/test \ --cache tests/regression/cache \ diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index bd7667d4a96..d394617bcb8 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -4960,7 +4960,7 @@ ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms, } /* If the dict is larger than we can reasonably index in our tables, only load the suffix. */ - { U32 maxDictSize = 2U << MIN(MAX(params->cParams.hashLog, params->cParams.chainLog), 30); + { U32 maxDictSize = 1U << MIN(MAX(params->cParams.hashLog + 3, params->cParams.chainLog + 1), 31); if (srcSize > maxDictSize) { ip = iend - maxDictSize; src = ip; diff --git a/tests/regression/results.csv b/tests/regression/results.csv index 94d113578fd..505f4755b85 100644 --- a/tests/regression/results.csv +++ b/tests/regression/results.csv @@ -395,19 +395,19 @@ github, level 13 with dict, advanced github, level 13 with dict dms, advanced one pass, 39900 github, level 13 with dict dds, advanced one pass, 39900 github, level 13 with dict copy, advanced one pass, 39948 -github, level 13 with dict load, advanced one pass, 42624 +github, level 13 with dict load, advanced one pass, 42643 github, level 16, advanced one pass, 133209 github, level 16 with dict, advanced one pass, 37902 github, level 16 with dict dms, advanced one pass, 37902 github, level 16 with dict dds, advanced one pass, 37902 github, level 16 with dict copy, advanced one pass, 37892 -github, level 16 with dict load, advanced one pass, 42402 +github, level 16 with dict load, advanced one pass, 42434 github, level 19, advanced one pass, 132879 github, level 19 with dict, advanced one pass, 37916 github, level 19 with dict dms, advanced one pass, 37916 github, level 19 with dict dds, advanced one pass, 37916 github, level 19 with dict copy, advanced one pass, 37906 -github, level 19 with dict load, advanced one pass, 39770 +github, level 19 with dict load, advanced one pass, 40405 github, no source size, advanced one pass, 136331 github, no source size with dict, advanced one pass, 41118 github, long distance mode, advanced one pass, 136331 @@ -713,19 +713,19 @@ github, level 13 with dict, advanced github, level 13 with dict dms, advanced one pass small out, 39900 github, level 13 with dict dds, advanced one pass small out, 39900 github, level 13 with dict copy, advanced one pass small out, 39948 -github, level 13 with dict load, advanced one pass small out, 42624 +github, level 13 with dict load, advanced one pass small out, 42643 github, level 16, advanced one pass small out, 133209 github, level 16 with dict, advanced one pass small out, 37902 github, level 16 with dict dms, advanced one pass small out, 37902 github, level 16 with dict dds, advanced one pass small out, 37902 github, level 16 with dict copy, advanced one pass small out, 37892 -github, level 16 with dict load, advanced one pass small out, 42402 +github, level 16 with dict load, advanced one pass small out, 42434 github, level 19, advanced one pass small out, 132879 github, level 19 with dict, advanced one pass small out, 37916 github, level 19 with dict dms, advanced one pass small out, 37916 github, level 19 with dict dds, advanced one pass small out, 37916 github, level 19 with dict copy, advanced one pass small out, 37906 -github, level 19 with dict load, advanced one pass small out, 39770 +github, level 19 with dict load, advanced one pass small out, 40405 github, no source size, advanced one pass small out, 136331 github, no source size with dict, advanced one pass small out, 41118 github, long distance mode, advanced one pass small out, 136331 @@ -1031,19 +1031,19 @@ github, level 13 with dict, advanced github, level 13 with dict dms, advanced streaming, 39900 github, level 13 with dict dds, advanced streaming, 39900 github, level 13 with dict copy, advanced streaming, 39948 -github, level 13 with dict load, advanced streaming, 42624 +github, level 13 with dict load, advanced streaming, 42643 github, level 16, advanced streaming, 133209 github, level 16 with dict, advanced streaming, 37902 github, level 16 with dict dms, advanced streaming, 37902 github, level 16 with dict dds, advanced streaming, 37902 github, level 16 with dict copy, advanced streaming, 37892 -github, level 16 with dict load, advanced streaming, 42402 +github, level 16 with dict load, advanced streaming, 42434 github, level 19, advanced streaming, 132879 github, level 19 with dict, advanced streaming, 37916 github, level 19 with dict dms, advanced streaming, 37916 github, level 19 with dict dds, advanced streaming, 37916 github, level 19 with dict copy, advanced streaming, 37906 -github, level 19 with dict load, advanced streaming, 39770 +github, level 19 with dict load, advanced streaming, 40405 github, no source size, advanced streaming, 136331 github, no source size with dict, advanced streaming, 41118 github, long distance mode, advanced streaming, 136331 From e637fc64c5f918e316146fb1d78c1cb587b1134c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 31 Jan 2025 15:00:36 -0800 Subject: [PATCH 3/9] update type naming convention --- lib/compress/zstdmt_compress.c | 102 ++++++++++++++++----------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 86ca0531555..fe98dab1c57 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -90,9 +90,9 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void) typedef struct buffer_s { void* start; size_t capacity; -} buffer_t; +} Buffer; -static const buffer_t g_nullBuffer = { NULL, 0 }; +static const Buffer g_nullBuffer = { NULL, 0 }; typedef struct ZSTDMT_bufferPool_s { ZSTD_pthread_mutex_t poolMutex; @@ -100,7 +100,7 @@ typedef struct ZSTDMT_bufferPool_s { unsigned totalBuffers; unsigned nbBuffers; ZSTD_customMem cMem; - buffer_t* buffers; + Buffer* buffers; } ZSTDMT_bufferPool; static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) @@ -128,7 +128,7 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_cu ZSTD_customFree(bufPool, cMem); return NULL; } - bufPool->buffers = (buffer_t*)ZSTD_customCalloc(maxNbBuffers * sizeof(buffer_t), cMem); + bufPool->buffers = (Buffer*)ZSTD_customCalloc(maxNbBuffers * sizeof(Buffer), cMem); if (bufPool->buffers==NULL) { ZSTDMT_freeBufferPool(bufPool); return NULL; @@ -144,7 +144,7 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_cu static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) { size_t const poolSize = sizeof(*bufPool); - size_t const arraySize = bufPool->totalBuffers * sizeof(buffer_t); + size_t const arraySize = bufPool->totalBuffers * sizeof(Buffer); unsigned u; size_t totalBufferSize = 0; ZSTD_pthread_mutex_lock(&bufPool->poolMutex); @@ -189,13 +189,13 @@ static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, * assumption : bufPool must be valid * @return : a buffer, with start pointer and size * note: allocation may fail, in this case, start==NULL and size==0 */ -static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) +static Buffer ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) { size_t const bSize = bufPool->bufferSize; DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize); ZSTD_pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers) { /* try to use an existing buffer */ - buffer_t const buf = bufPool->buffers[--(bufPool->nbBuffers)]; + Buffer const buf = bufPool->buffers[--(bufPool->nbBuffers)]; size_t const availBufferSize = buf.capacity; bufPool->buffers[bufPool->nbBuffers] = g_nullBuffer; if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) { @@ -212,7 +212,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) ZSTD_pthread_mutex_unlock(&bufPool->poolMutex); /* create new buffer */ DEBUGLOG(5, "ZSTDMT_getBuffer: create a new buffer"); - { buffer_t buffer; + { Buffer buffer; void* const start = ZSTD_customMalloc(bSize, bufPool->cMem); buffer.start = start; /* note : start can be NULL if malloc fails ! */ buffer.capacity = (start==NULL) ? 0 : bSize; @@ -231,12 +231,12 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) * @return : a buffer that is at least the buffer pool buffer size. * If a reallocation happens, the data in the input buffer is copied. */ -static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer) +static Buffer ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, Buffer buffer) { size_t const bSize = bufPool->bufferSize; if (buffer.capacity < bSize) { void* const start = ZSTD_customMalloc(bSize, bufPool->cMem); - buffer_t newBuffer; + Buffer newBuffer; newBuffer.start = start; newBuffer.capacity = start == NULL ? 0 : bSize; if (start != NULL) { @@ -252,7 +252,7 @@ static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer) #endif /* store buffer for later re-use, up to pool capacity */ -static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) +static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, Buffer buf) { DEBUGLOG(5, "ZSTDMT_releaseBuffer"); if (buf.start == NULL) return; /* compatible with release on NULL */ @@ -290,7 +290,7 @@ static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool) return ZSTDMT_sizeof_bufferPool(seqPool); } -static RawSeqStore_t bufferToSeq(buffer_t buffer) +static RawSeqStore_t bufferToSeq(Buffer buffer) { RawSeqStore_t seq = kNullRawSeqStore; seq.seq = (rawSeq*)buffer.start; @@ -298,9 +298,9 @@ static RawSeqStore_t bufferToSeq(buffer_t buffer) return seq; } -static buffer_t seqToBuffer(RawSeqStore_t seq) +static Buffer seqToBuffer(RawSeqStore_t seq) { - buffer_t buffer; + Buffer buffer; buffer.start = seq.seq; buffer.capacity = seq.capacity * sizeof(rawSeq); return buffer; @@ -466,7 +466,7 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) typedef struct { void const* start; size_t size; -} range_t; +} Range; typedef struct { /* All variables in the struct are protected by mutex. */ @@ -482,10 +482,10 @@ typedef struct { ZSTD_pthread_mutex_t ldmWindowMutex; ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is updated */ ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ -} serialState_t; +} SerialState; static int -ZSTDMT_serialState_reset(serialState_t* serialState, +ZSTDMT_serialState_reset(SerialState* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize, @@ -555,7 +555,7 @@ ZSTDMT_serialState_reset(serialState_t* serialState, return 0; } -static int ZSTDMT_serialState_init(serialState_t* serialState) +static int ZSTDMT_serialState_init(SerialState* serialState) { int initError = 0; ZSTD_memset(serialState, 0, sizeof(*serialState)); @@ -566,7 +566,7 @@ static int ZSTDMT_serialState_init(serialState_t* serialState) return initError; } -static void ZSTDMT_serialState_free(serialState_t* serialState) +static void ZSTDMT_serialState_free(SerialState* serialState) { ZSTD_customMem cMem = serialState->params.customMem; ZSTD_pthread_mutex_destroy(&serialState->mutex); @@ -577,9 +577,9 @@ static void ZSTDMT_serialState_free(serialState_t* serialState) ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem); } -static void ZSTDMT_serialState_update(serialState_t* serialState, +static void ZSTDMT_serialState_update(SerialState* serialState, ZSTD_CCtx* jobCCtx, RawSeqStore_t seqStore, - range_t src, unsigned jobID) + Range src, unsigned jobID) { /* Wait for our turn */ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); @@ -623,7 +623,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, } } -static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState, +static void ZSTDMT_serialState_ensureFinished(SerialState* serialState, unsigned jobID, size_t cSize) { ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); @@ -647,7 +647,7 @@ static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState, /* ===== Worker thread ===== */ /* ------------------------------------------ */ -static const range_t kNullRange = { NULL, 0 }; +static const Range kNullRange = { NULL, 0 }; typedef struct { size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ @@ -657,10 +657,10 @@ typedef struct { ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */ - serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */ - buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ - range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */ - range_t src; /* set by mtctx, then read by worker & mtctx => no barrier */ + SerialState* serial; /* Thread-safe - used by mtctx and (all) workers */ + Buffer dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ + Range prefix; /* set by mtctx, then read by worker & mtctx => no barrier */ + Range src; /* set by mtctx, then read by worker & mtctx => no barrier */ unsigned jobID; /* set by mtctx, then read by worker => no barrier */ unsigned firstJob; /* set by mtctx, then read by worker => no barrier */ unsigned lastJob; /* set by mtctx, then read by worker => no barrier */ @@ -686,7 +686,7 @@ static void ZSTDMT_compressionJob(void* jobDescription) ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); RawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); - buffer_t dstBuff = job->dstBuff; + Buffer dstBuff = job->dstBuff; size_t lastCBlockSize = 0; /* resources */ @@ -809,10 +809,10 @@ static void ZSTDMT_compressionJob(void* jobDescription) /* ------------------------------------------ */ typedef struct { - range_t prefix; /* read-only non-owned prefix buffer */ - buffer_t buffer; + Range prefix; /* read-only non-owned prefix buffer */ + Buffer buffer; size_t filled; -} inBuff_t; +} InBuff_t; typedef struct { BYTE* buffer; /* The round input buffer. All jobs get references @@ -826,9 +826,9 @@ typedef struct { * the inBuff is sent to the worker thread. * pos <= capacity. */ -} roundBuff_t; +} RoundBuff_t; -static const roundBuff_t kNullRoundBuff = {NULL, 0, 0}; +static const RoundBuff_t kNullRoundBuff = {NULL, 0, 0}; #define RSYNC_LENGTH 32 /* Don't create chunks smaller than the zstd block size. @@ -845,7 +845,7 @@ typedef struct { U64 hash; U64 hitMask; U64 primePower; -} rsyncState_t; +} RSyncState_t; struct ZSTDMT_CCtx_s { POOL_ctx* factory; @@ -857,10 +857,10 @@ struct ZSTDMT_CCtx_s { size_t targetSectionSize; size_t targetPrefixSize; int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */ - inBuff_t inBuff; - roundBuff_t roundBuff; - serialState_t serial; - rsyncState_t rsync; + InBuff_t inBuff; + RoundBuff_t roundBuff; + SerialState serial; + RSyncState_t rsync; unsigned jobIDMask; unsigned doneJobID; unsigned nextJobID; @@ -1538,7 +1538,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u * If the data of the first job is broken up into two segments, we cover both * sections. */ -static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx) +static Range ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx) { unsigned const firstJobID = mtctx->doneJobID; unsigned const lastJobID = mtctx->nextJobID; @@ -1553,7 +1553,7 @@ static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx) ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); if (consumed < mtctx->jobs[wJobID].src.size) { - range_t range = mtctx->jobs[wJobID].prefix; + Range range = mtctx->jobs[wJobID].prefix; if (range.size == 0) { /* Empty prefix */ range = mtctx->jobs[wJobID].src; @@ -1569,7 +1569,7 @@ static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx) /** * Returns non-zero iff buffer and range overlap. */ -static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range) +static int ZSTDMT_isOverlapped(Buffer buffer, Range range) { BYTE const* const bufferStart = (BYTE const*)buffer.start; BYTE const* const rangeStart = (BYTE const*)range.start; @@ -1589,10 +1589,10 @@ static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range) } } -static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window) +static int ZSTDMT_doesOverlapWindow(Buffer buffer, ZSTD_window_t window) { - range_t extDict; - range_t prefix; + Range extDict; + Range prefix; DEBUGLOG(5, "ZSTDMT_doesOverlapWindow"); extDict.start = window.dictBase + window.lowLimit; @@ -1611,7 +1611,7 @@ static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window) || ZSTDMT_isOverlapped(buffer, prefix); } -static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) +static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, Buffer buffer) { if (mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable) { ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; @@ -1636,10 +1636,10 @@ static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) */ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) { - range_t const inUse = ZSTDMT_getInputDataInUse(mtctx); + Range const inUse = ZSTDMT_getInputDataInUse(mtctx); size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos; size_t const target = mtctx->targetSectionSize; - buffer_t buffer; + Buffer buffer; DEBUGLOG(5, "ZSTDMT_tryGetInputRange"); assert(mtctx->inBuff.buffer.start == NULL); @@ -1691,7 +1691,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) typedef struct { size_t toLoad; /* The number of bytes to load from the input. */ int flush; /* Boolean declaring if we must flush because we found a synchronization point. */ -} syncPoint_t; +} SyncPoint; /** * Searches through the input for a synchronization point. If one is found, we @@ -1699,14 +1699,14 @@ typedef struct { * Otherwise, we will load as many bytes as possible and instruct the caller * to continue as normal. */ -static syncPoint_t +static SyncPoint findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) { BYTE const* const istart = (BYTE const*)input.src + input.pos; U64 const primePower = mtctx->rsync.primePower; U64 const hitMask = mtctx->rsync.hitMask; - syncPoint_t syncPoint; + SyncPoint syncPoint; U64 hash; BYTE const* prev; size_t pos; @@ -1838,7 +1838,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start); } if (mtctx->inBuff.buffer.start != NULL) { - syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input); + SyncPoint const syncPoint = findSynchronizationPoint(mtctx, *input); if (syncPoint.flush && endOp == ZSTD_e_continue) { endOp = ZSTD_e_flush; } From 85a44b233accb544d89c85b804182e3f34e8d4b1 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 31 Jan 2025 15:53:25 -0800 Subject: [PATCH 4/9] always free .cdictLocal --- lib/compress/zstdmt_compress.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index fe98dab1c57..dbaffe53643 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1258,15 +1258,14 @@ size_t ZSTDMT_initCStream_internal( mtctx->params = params; mtctx->frameContentSize = pledgedSrcSize; + ZSTD_freeCDict(mtctx->cdictLocal); if (dict) { - ZSTD_freeCDict(mtctx->cdictLocal); mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, ZSTD_dlm_byCopy, dictContentType, /* note : a loadPrefix becomes an internal CDict */ params.cParams, mtctx->cMem); mtctx->cdict = mtctx->cdictLocal; if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); } else { - ZSTD_freeCDict(mtctx->cdictLocal); mtctx->cdictLocal = NULL; mtctx->cdict = cdict; } @@ -1401,7 +1400,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->roundBuff.pos += srcSize; mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.filled = 0; - /* Set the prefix */ + /* Set the prefix for next job */ if (!endFrame) { size_t const newPrefixSize = MIN(srcSize, mtctx->targetPrefixSize); mtctx->inBuff.prefix.start = src + srcSize - newPrefixSize; From 220abe6da857142305ab7337b346c826856bcfd1 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 31 Jan 2025 18:19:45 -0800 Subject: [PATCH 5/9] reduced memory usage by avoiding to duplicate in memory a dictionary that was passed by reference. --- lib/compress/zstd_compress.c | 12 +++++++----- lib/compress/zstd_compress_internal.h | 27 ++++++++++++++------------- lib/compress/zstd_lazy.c | 16 ++++++++-------- lib/compress/zstd_opt.c | 14 +++++++------- lib/compress/zstdmt_compress.c | 4 ++-- lib/compress/zstdmt_compress.h | 2 +- 6 files changed, 39 insertions(+), 36 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d394617bcb8..d7585c884a3 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -1354,10 +1354,12 @@ size_t ZSTD_CCtx_refPrefix_advanced( RETURN_ERROR_IF(cctx->streamStage != zcss_init, stage_wrong, "Can't ref a prefix when ctx not in init stage."); ZSTD_clearAllDicts(cctx); - if (prefix != NULL && prefixSize > 0) { + if (prefixSize > 0) { + RETURN_ERROR_IF(prefix == NULL, dictionary_wrong, "Invalid prefix pointer"); cctx->prefixDict.dict = prefix; cctx->prefixDict.dictSize = prefixSize; cctx->prefixDict.dictContentType = dictContentType; + cctx->prefixDict.loadMethod = ZSTD_dlm_byRef; } return 0; } @@ -3066,7 +3068,7 @@ ZSTD_entropyCompressSeqStore( /* ZSTD_selectBlockCompressor() : * Not static, but internal use only (used by long distance matcher) * assumption : strat is a valid strategy */ -ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_ParamSwitch_e useRowMatchFinder, ZSTD_dictMode_e dictMode) +ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_ParamSwitch_e useRowMatchFinder, ZSTD_DictMode_e dictMode) { static const ZSTD_BlockCompressor_f blockCompressor[4][ZSTD_STRATEGY_MAX+1] = { { ZSTD_compressBlock_fast /* default for 0 */, @@ -3298,7 +3300,7 @@ static size_t ZSTD_buildSeqStore(ZSTD_CCtx* zc, const void* src, size_t srcSize) } /* select and store sequences */ - { ZSTD_dictMode_e const dictMode = ZSTD_matchState_dictMode(ms); + { ZSTD_DictMode_e const dictMode = ZSTD_matchState_dictMode(ms); size_t lastLLSize; { int i; for (i = 0; i < ZSTD_REP_NUM; ++i) @@ -6351,7 +6353,7 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, size_t inSize) { ZSTD_CCtx_params params = cctx->requestedParams; - ZSTD_prefixDict const prefixDict = cctx->prefixDict; + ZSTD_PrefixDict const prefixDict = cctx->prefixDict; FORWARD_IF_ERROR( ZSTD_initLocalDict(cctx) , ""); /* Init the local dict if present. */ ZSTD_memset(&cctx->prefixDict, 0, sizeof(cctx->prefixDict)); /* single usage */ assert(prefixDict.dict==NULL || cctx->cdict==NULL); /* only one can be set */ @@ -6407,7 +6409,7 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbWorkers=%u", params.nbWorkers); FORWARD_IF_ERROR( ZSTDMT_initCStream_internal( cctx->mtctx, - prefixDict.dict, prefixDict.dictSize, prefixDict.dictContentType, + prefixDict.dict, prefixDict.dictSize, prefixDict.dictContentType, prefixDict.loadMethod, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) , ""); cctx->dictID = cctx->cdict ? cctx->cdict->dictID : 0; cctx->dictContentSize = cctx->cdict ? cctx->cdict->dictContentSize : prefixDict.dictSize; diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index ca5e2a4c5bf..f71c47e880c 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -12,8 +12,8 @@ * that shall **only** be used by modules within lib/compress. */ -#ifndef ZSTD_COMPRESS_H -#define ZSTD_COMPRESS_H +#ifndef ZSTD_COMPRESS_INTERNAL_H +#define ZSTD_COMPRESS_INTERNAL_H /*-************************************* * Dependencies @@ -21,10 +21,10 @@ #include "../common/zstd_internal.h" #include "zstd_cwksp.h" #ifdef ZSTD_MULTITHREAD -# include "zstdmt_compress.h" +# include "zstdmt_compress.h" /* ZSTDMT_CCtx */ #endif #include "../common/bits.h" /* ZSTD_highbit32, ZSTD_NbCommonBytes */ -#include "zstd_preSplit.h" /* ZSTD_SLIPBLOCK_WORKSPACESIZE */ +#include "zstd_preSplit.h" /* ZSTD_SLIPBLOCK_WORKSPACESIZE */ /*-************************************* * Constants @@ -46,11 +46,12 @@ typedef enum { ZSTDcs_created=0, ZSTDcs_init, ZSTDcs_ongoing, ZSTDcs_ending } ZSTD_compressionStage_e; typedef enum { zcss_init=0, zcss_load, zcss_flush } ZSTD_cStreamStage; -typedef struct ZSTD_prefixDict_s { +typedef struct { const void* dict; size_t dictSize; ZSTD_dictContentType_e dictContentType; -} ZSTD_prefixDict; + ZSTD_dictLoadMethod_e loadMethod; +} ZSTD_PrefixDict; typedef struct { void* dictBuffer; @@ -467,7 +468,7 @@ typedef struct { U32 partitions[ZSTD_MAX_NB_BLOCK_SPLITS]; ZSTD_entropyCTablesMetadata_t entropyMetadata; -} ZSTD_blockSplitCtx; +} ZSTD_BlockSplitCtx; struct ZSTD_CCtx_s { ZSTD_compressionStage_e stage; @@ -525,7 +526,7 @@ struct ZSTD_CCtx_s { /* Dictionary */ ZSTD_localDict localDict; const ZSTD_CDict* cdict; - ZSTD_prefixDict prefixDict; /* single-usage dictionary */ + ZSTD_PrefixDict prefixDict; /* single-usage dictionary */ /* Multi-threading */ #ifdef ZSTD_MULTITHREAD @@ -538,7 +539,7 @@ struct ZSTD_CCtx_s { #endif /* Workspace for block splitter */ - ZSTD_blockSplitCtx blockSplitCtx; + ZSTD_BlockSplitCtx blockSplitCtx; /* Buffer for output from external sequence producer */ ZSTD_Sequence* extSeqBuf; @@ -553,7 +554,7 @@ typedef enum { ZSTD_extDict = 1, ZSTD_dictMatchState = 2, ZSTD_dedicatedDictSearch = 3 -} ZSTD_dictMode_e; +} ZSTD_DictMode_e; typedef enum { ZSTD_cpm_noAttachDict = 0, /* Compression with ZSTD_noDict or ZSTD_extDict. @@ -578,7 +579,7 @@ typedef enum { typedef size_t (*ZSTD_BlockCompressor_f) ( ZSTD_MatchState_t* bs, SeqStore_t* seqStore, U32 rep[ZSTD_REP_NUM], void const* src, size_t srcSize); -ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_ParamSwitch_e rowMatchfinderMode, ZSTD_dictMode_e dictMode); +ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_ParamSwitch_e rowMatchfinderMode, ZSTD_DictMode_e dictMode); MEM_STATIC U32 ZSTD_LLcode(U32 litLength) @@ -1068,7 +1069,7 @@ MEM_STATIC U32 ZSTD_window_hasExtDict(ZSTD_window_t const window) * Inspects the provided matchState and figures out what dictMode should be * passed to the compressor. */ -MEM_STATIC ZSTD_dictMode_e ZSTD_matchState_dictMode(const ZSTD_MatchState_t *ms) +MEM_STATIC ZSTD_DictMode_e ZSTD_matchState_dictMode(const ZSTD_MatchState_t *ms) { return ZSTD_window_hasExtDict(ms->window) ? ZSTD_extDict : @@ -1633,4 +1634,4 @@ size_t ZSTD_compressEnd_public(ZSTD_CCtx* cctx, size_t ZSTD_compressBlock_deprecated(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize); -#endif /* ZSTD_COMPRESS_H */ +#endif /* ZSTD_COMPRESS_INTERNAL_H */ diff --git a/lib/compress/zstd_lazy.c b/lib/compress/zstd_lazy.c index 272ebe0ece7..1d5b37c20cc 100644 --- a/lib/compress/zstd_lazy.c +++ b/lib/compress/zstd_lazy.c @@ -74,7 +74,7 @@ ZSTD_ALLOW_POINTER_OVERFLOW_ATTR void ZSTD_insertDUBT1(const ZSTD_MatchState_t* ms, U32 curr, const BYTE* inputEnd, U32 nbCompares, U32 btLow, - const ZSTD_dictMode_e dictMode) + const ZSTD_DictMode_e dictMode) { const ZSTD_compressionParameters* const cParams = &ms->cParams; U32* const bt = ms->chainTable; @@ -168,7 +168,7 @@ size_t ZSTD_DUBT_findBetterDictMatch ( size_t bestLength, U32 nbCompares, U32 const mls, - const ZSTD_dictMode_e dictMode) + const ZSTD_DictMode_e dictMode) { const ZSTD_MatchState_t * const dms = ms->dictMatchState; const ZSTD_compressionParameters* const dmsCParams = &dms->cParams; @@ -244,7 +244,7 @@ size_t ZSTD_DUBT_findBestMatch(ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iend, size_t* offBasePtr, U32 const mls, - const ZSTD_dictMode_e dictMode) + const ZSTD_DictMode_e dictMode) { const ZSTD_compressionParameters* const cParams = &ms->cParams; U32* const hashTable = ms->hashTable; @@ -396,7 +396,7 @@ size_t ZSTD_BtFindBestMatch( ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iLimit, size_t* offBasePtr, const U32 mls /* template */, - const ZSTD_dictMode_e dictMode) + const ZSTD_DictMode_e dictMode) { DEBUGLOG(7, "ZSTD_BtFindBestMatch"); if (ip < ms->window.base + ms->nextToUpdate) return 0; /* skipped area */ @@ -668,7 +668,7 @@ size_t ZSTD_HcFindBestMatch( ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iLimit, size_t* offsetPtr, - const U32 mls, const ZSTD_dictMode_e dictMode) + const U32 mls, const ZSTD_DictMode_e dictMode) { const ZSTD_compressionParameters* const cParams = &ms->cParams; U32* const chainTable = ms->chainTable; @@ -1142,7 +1142,7 @@ size_t ZSTD_RowFindBestMatch( ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iLimit, size_t* offsetPtr, - const U32 mls, const ZSTD_dictMode_e dictMode, + const U32 mls, const ZSTD_DictMode_e dictMode, const U32 rowLog) { U32* const hashTable = ms->hashTable; @@ -1492,7 +1492,7 @@ FORCE_INLINE_TEMPLATE size_t ZSTD_searchMax( U32 const mls, U32 const rowLog, searchMethod_e const searchMethod, - ZSTD_dictMode_e const dictMode) + ZSTD_DictMode_e const dictMode) { if (dictMode == ZSTD_noDict) { ZSTD_SWITCH_SEARCH_METHOD(noDict) @@ -1518,7 +1518,7 @@ size_t ZSTD_compressBlock_lazy_generic( U32 rep[ZSTD_REP_NUM], const void* src, size_t srcSize, const searchMethod_e searchMethod, const U32 depth, - ZSTD_dictMode_e const dictMode) + ZSTD_DictMode_e const dictMode) { const BYTE* const istart = (const BYTE*)src; const BYTE* ip = istart; diff --git a/lib/compress/zstd_opt.c b/lib/compress/zstd_opt.c index 3d7171b755b..09de5a9d9be 100644 --- a/lib/compress/zstd_opt.c +++ b/lib/compress/zstd_opt.c @@ -562,7 +562,7 @@ ZSTD_ALLOW_POINTER_OVERFLOW_ATTR void ZSTD_updateTree_internal( ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iend, - const U32 mls, const ZSTD_dictMode_e dictMode) + const U32 mls, const ZSTD_DictMode_e dictMode) { const BYTE* const base = ms->window.base; U32 const target = (U32)(ip - base); @@ -592,7 +592,7 @@ ZSTD_insertBtAndGetAllMatches ( ZSTD_MatchState_t* ms, U32* nextToUpdate3, const BYTE* const ip, const BYTE* const iLimit, - const ZSTD_dictMode_e dictMode, + const ZSTD_DictMode_e dictMode, const U32 rep[ZSTD_REP_NUM], const U32 ll0, /* tells if associated literal length is 0 or not. This value must be 0 or 1 */ const U32 lengthToBeat, @@ -838,7 +838,7 @@ U32 ZSTD_btGetAllMatches_internal( const U32 rep[ZSTD_REP_NUM], U32 const ll0, U32 const lengthToBeat, - const ZSTD_dictMode_e dictMode, + const ZSTD_DictMode_e dictMode, const U32 mls) { assert(BOUNDED(3, ms->cParams.minMatch, 6) == mls); @@ -886,7 +886,7 @@ GEN_ZSTD_BT_GET_ALL_MATCHES(dictMatchState) } static ZSTD_getAllMatchesFn -ZSTD_selectBtGetAllMatches(ZSTD_MatchState_t const* ms, ZSTD_dictMode_e const dictMode) +ZSTD_selectBtGetAllMatches(ZSTD_MatchState_t const* ms, ZSTD_DictMode_e const dictMode) { ZSTD_getAllMatchesFn const getAllMatchesFns[3][4] = { ZSTD_BT_GET_ALL_MATCHES_ARRAY(noDict), @@ -1079,7 +1079,7 @@ ZSTD_compressBlock_opt_generic(ZSTD_MatchState_t* ms, U32 rep[ZSTD_REP_NUM], const void* src, size_t srcSize, const int optLevel, - const ZSTD_dictMode_e dictMode) + const ZSTD_DictMode_e dictMode) { optState_t* const optStatePtr = &ms->opt; const BYTE* const istart = (const BYTE*)src; @@ -1445,7 +1445,7 @@ ZSTD_compressBlock_opt_generic(ZSTD_MatchState_t* ms, #ifndef ZSTD_EXCLUDE_BTOPT_BLOCK_COMPRESSOR static size_t ZSTD_compressBlock_opt0( ZSTD_MatchState_t* ms, SeqStore_t* seqStore, U32 rep[ZSTD_REP_NUM], - const void* src, size_t srcSize, const ZSTD_dictMode_e dictMode) + const void* src, size_t srcSize, const ZSTD_DictMode_e dictMode) { return ZSTD_compressBlock_opt_generic(ms, seqStore, rep, src, srcSize, 0 /* optLevel */, dictMode); } @@ -1454,7 +1454,7 @@ static size_t ZSTD_compressBlock_opt0( #ifndef ZSTD_EXCLUDE_BTULTRA_BLOCK_COMPRESSOR static size_t ZSTD_compressBlock_opt2( ZSTD_MatchState_t* ms, SeqStore_t* seqStore, U32 rep[ZSTD_REP_NUM], - const void* src, size_t srcSize, const ZSTD_dictMode_e dictMode) + const void* src, size_t srcSize, const ZSTD_DictMode_e dictMode) { return ZSTD_compressBlock_opt_generic(ms, seqStore, rep, src, srcSize, 2 /* optLevel */, dictMode); } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index dbaffe53643..4de6375131b 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1232,7 +1232,7 @@ static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params) size_t ZSTDMT_initCStream_internal( ZSTDMT_CCtx* mtctx, - const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, + const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, ZSTD_dictLoadMethod_e dictLoadMethod, const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) { @@ -1261,7 +1261,7 @@ size_t ZSTDMT_initCStream_internal( ZSTD_freeCDict(mtctx->cdictLocal); if (dict) { mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, - ZSTD_dlm_byCopy, dictContentType, /* note : a loadPrefix becomes an internal CDict */ + dictLoadMethod, dictContentType, /* note : a loadPrefix becomes an internal CDict */ params.cParams, mtctx->cMem); mtctx->cdict = mtctx->cdictLocal; if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 91b489b9cb4..2158d614f30 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -64,7 +64,7 @@ size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx); * even if they are not needed for the current compression. * @return : 0, or an error code */ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* mtctx, - const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, + const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, ZSTD_dictLoadMethod_e dictLoadMethod, const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize); From 7406d2b6eb91851db6a1cef10121de2a4c5a794a Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 31 Jan 2025 20:57:21 -0800 Subject: [PATCH 6/9] skips the need to create a temporary cdict for --patch-from thus saving a bit of memory and a little bit of cpu time --- lib/compress/zstdmt_compress.c | 81 ++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 4de6375131b..210ff63fa57 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -650,25 +650,25 @@ static void ZSTDMT_serialState_ensureFinished(SerialState* serialState, static const Range kNullRange = { NULL, 0 }; typedef struct { - size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ - size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ - ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */ - ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ - ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ - ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ - ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */ + size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ + size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ + ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */ + ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ + ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */ SerialState* serial; /* Thread-safe - used by mtctx and (all) workers */ Buffer dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ Range prefix; /* set by mtctx, then read by worker & mtctx => no barrier */ Range src; /* set by mtctx, then read by worker & mtctx => no barrier */ - unsigned jobID; /* set by mtctx, then read by worker => no barrier */ - unsigned firstJob; /* set by mtctx, then read by worker => no barrier */ - unsigned lastJob; /* set by mtctx, then read by worker => no barrier */ - ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ - const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */ - unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ - size_t dstFlushed; /* used only by mtctx */ - unsigned frameChecksumNeeded; /* used only by mtctx */ + unsigned jobID; /* set by mtctx, then read by worker => no barrier */ + unsigned firstJob; /* set by mtctx, then read by worker => no barrier */ + unsigned lastJob; /* set by mtctx, then read by worker => no barrier */ + ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ + const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */ + unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ + size_t dstFlushed; /* used only by mtctx */ + unsigned frameChecksumNeeded; /* used only by mtctx */ } ZSTDMT_jobDescription; #define JOB_ERROR(e) \ @@ -714,7 +714,7 @@ static void ZSTDMT_compressionJob(void* jobDescription) size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize); assert(job->firstJob); /* only allowed for first job */ if (ZSTD_isError(initError)) JOB_ERROR(initError); - } else { /* srcStart points at reloaded section */ + } else { U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; { size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob); if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError); @@ -741,7 +741,7 @@ static void ZSTDMT_compressionJob(void* jobDescription) ZSTD_invalidateRepCodes(cctx); } - /* compress */ + /* compress the entire job by smaller chunks, for better granularity */ { size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX; int const nbChunks = (int)((job->src.size + (chunkSize-1)) / chunkSize); const BYTE* ip = (const BYTE*) job->src.start; @@ -1258,18 +1258,6 @@ size_t ZSTDMT_initCStream_internal( mtctx->params = params; mtctx->frameContentSize = pledgedSrcSize; - ZSTD_freeCDict(mtctx->cdictLocal); - if (dict) { - mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, - dictLoadMethod, dictContentType, /* note : a loadPrefix becomes an internal CDict */ - params.cParams, mtctx->cMem); - mtctx->cdict = mtctx->cdictLocal; - if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); - } else { - mtctx->cdictLocal = NULL; - mtctx->cdict = cdict; - } - mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(¶ms); DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10)); mtctx->targetSectionSize = params.jobSize; @@ -1331,9 +1319,31 @@ size_t ZSTDMT_initCStream_internal( mtctx->allJobsCompleted = 0; mtctx->consumed = 0; mtctx->produced = 0; + + /* update dictionary */ + ZSTD_freeCDict(mtctx->cdictLocal); + mtctx->cdictLocal = NULL; + if (dict) { + if (dictContentType == ZSTD_dct_rawContent) { + mtctx->inBuff.prefix.start = (const BYTE*)dict; + mtctx->inBuff.prefix.size = dictSize; + } else { + /* note : a loadPrefix becomes an internal CDict */ + mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, + dictLoadMethod, dictContentType, + params.cParams, mtctx->cMem); + mtctx->cdict = mtctx->cdictLocal; + if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); + } + } else { + mtctx->cdict = cdict; + } + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize, dict, dictSize, dictContentType)) return ERROR(memory_allocation); + + return 0; } @@ -1543,6 +1553,11 @@ static Range ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx) unsigned const lastJobID = mtctx->nextJobID; unsigned jobID; + /* no need to check during first round */ + size_t roundBuffCapacity = mtctx->roundBuff.capacity; + size_t nbJobs1stRoundMin = roundBuffCapacity / mtctx->targetSectionSize; + if (lastJobID < nbJobs1stRoundMin) return kNullRange; + for (jobID = firstJobID; jobID < lastJobID; ++jobID) { unsigned const wJobID = jobID & mtctx->jobIDMask; size_t consumed; @@ -1637,14 +1652,14 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) { Range const inUse = ZSTDMT_getInputDataInUse(mtctx); size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos; - size_t const target = mtctx->targetSectionSize; + size_t const spaceNeeded = mtctx->targetSectionSize; Buffer buffer; DEBUGLOG(5, "ZSTDMT_tryGetInputRange"); assert(mtctx->inBuff.buffer.start == NULL); - assert(mtctx->roundBuff.capacity >= target); + assert(mtctx->roundBuff.capacity >= spaceNeeded); - if (spaceLeft < target) { + if (spaceLeft < spaceNeeded) { /* ZSTD_invalidateRepCodes() doesn't work for extDict variants. * Simply copy the prefix to the beginning in that case. */ @@ -1663,7 +1678,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) mtctx->roundBuff.pos = prefixSize; } buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos; - buffer.capacity = target; + buffer.capacity = spaceNeeded; if (ZSTDMT_isOverlapped(buffer, inUse)) { DEBUGLOG(5, "Waiting for buffer..."); From f11bd19c7f7899840585a260688e969eb705a008 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Sat, 1 Feb 2025 00:55:52 -0800 Subject: [PATCH 7/9] ensure cdict is properly reset to NULL --- .github/workflows/dev-long-tests.yml | 4 ++-- lib/compress/zstdmt_compress.c | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/dev-long-tests.yml b/.github/workflows/dev-long-tests.yml index 0adfed91412..899a57b754b 100644 --- a/.github/workflows/dev-long-tests.yml +++ b/.github/workflows/dev-long-tests.yml @@ -67,11 +67,11 @@ jobs: - name: thread sanitizer zstreamtest run: CC=clang ZSTREAM_TESTTIME=-T3mn make tsan-test-zstream - ubsan-zstreamtest: + uasan-zstreamtest: runs-on: ubuntu-latest steps: - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # tag=v4.1.1 - - name: undefined behavior sanitizer zstreamtest + - name: ub + address sanitizer on zstreamtest run: CC=clang make uasan-test-zstream # lasts ~15mn diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 210ff63fa57..1840ad85535 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1323,6 +1323,7 @@ size_t ZSTDMT_initCStream_internal( /* update dictionary */ ZSTD_freeCDict(mtctx->cdictLocal); mtctx->cdictLocal = NULL; + mtctx->cdict = NULL; if (dict) { if (dictContentType == ZSTD_dct_rawContent) { mtctx->inBuff.prefix.start = (const BYTE*)dict; From c7cd7dc04bede050475da32fa019c2d0712ed6cf Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Sat, 1 Feb 2025 00:41:36 -0800 Subject: [PATCH 8/9] better MT fluidity --patch-from no longer blocked on first job dictionary loading --- lib/compress/zstd_compress.c | 4 ++-- lib/compress/zstdmt_compress.c | 40 ++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d7585c884a3..c560c902a54 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -4900,7 +4900,7 @@ size_t ZSTD_compressBlock(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const /*! ZSTD_loadDictionaryContent() : * @return : 0, or an error code */ -static size_t +static size_t ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms, ldmState_t* ls, ZSTD_cwksp* ws, @@ -5603,7 +5603,7 @@ static size_t ZSTD_initCDict_internal( return 0; } -static ZSTD_CDict* +static ZSTD_CDict* ZSTD_createCDict_advanced_internal(size_t dictSize, ZSTD_dictLoadMethod_e dictLoadMethod, ZSTD_compressionParameters cParams, diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 1840ad85535..8725b90716f 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -577,9 +577,10 @@ static void ZSTDMT_serialState_free(SerialState* serialState) ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem); } -static void ZSTDMT_serialState_update(SerialState* serialState, - ZSTD_CCtx* jobCCtx, RawSeqStore_t seqStore, - Range src, unsigned jobID) +static void +ZSTDMT_serialState_genSequences(SerialState* serialState, + RawSeqStore_t* seqStore, + Range src, unsigned jobID) { /* Wait for our turn */ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); @@ -592,12 +593,13 @@ static void ZSTDMT_serialState_update(SerialState* serialState, /* It is now our turn, do any processing necessary */ if (serialState->params.ldmParams.enableLdm == ZSTD_ps_enable) { size_t error; - assert(seqStore.seq != NULL && seqStore.pos == 0 && - seqStore.size == 0 && seqStore.capacity > 0); + DEBUGLOG(6, "ZSTDMT_serialState_genSequences: LDM update"); + assert(seqStore->seq != NULL && seqStore->pos == 0 && + seqStore->size == 0 && seqStore->capacity > 0); assert(src.size <= serialState->params.jobSize); ZSTD_window_update(&serialState->ldmState.window, src.start, src.size, /* forceNonContiguous */ 0); error = ZSTD_ldm_generateSequences( - &serialState->ldmState, &seqStore, + &serialState->ldmState, seqStore, &serialState->params.ldmParams, src.start, src.size); /* We provide a large enough buffer to never fail. */ assert(!ZSTD_isError(error)); (void)error; @@ -616,10 +618,18 @@ static void ZSTDMT_serialState_update(SerialState* serialState, serialState->nextJobID++; ZSTD_pthread_cond_broadcast(&serialState->cond); ZSTD_pthread_mutex_unlock(&serialState->mutex); +} - if (seqStore.size > 0) { - ZSTD_referenceExternalSequences(jobCCtx, seqStore.seq, seqStore.size); - assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable); +static void +ZSTDMT_serialState_applySequences(const SerialState* serialState, /* just for an assert() check */ + ZSTD_CCtx* jobCCtx, + const RawSeqStore_t* seqStore) +{ + if (seqStore->size > 0) { + DEBUGLOG(5, "ZSTDMT_serialState_applySequences: uploading %u external sequences", (unsigned)seqStore->size); + assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable); (void)serialState; + assert(jobCCtx); + ZSTD_referenceExternalSequences(jobCCtx, seqStore->seq, seqStore->size); } } @@ -689,6 +699,7 @@ static void ZSTDMT_compressionJob(void* jobDescription) Buffer dstBuff = job->dstBuff; size_t lastCBlockSize = 0; + DEBUGLOG(5, "ZSTDMT_compressionJob: job %u", job->jobID); /* resources */ if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation)); if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ @@ -710,6 +721,10 @@ static void ZSTDMT_compressionJob(void* jobDescription) /* init */ + + /* Perform serial step as early as possible */ + ZSTDMT_serialState_genSequences(job->serial, &rawSeqStore, job->src, job->jobID); + if (job->cdict) { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize); assert(job->firstJob); /* only allowed for first job */ @@ -723,16 +738,17 @@ static void ZSTDMT_compressionJob(void* jobDescription) size_t const err = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_deterministicRefPrefix, 0); if (ZSTD_isError(err)) JOB_ERROR(err); } + DEBUGLOG(6, "ZSTDMT_compressionJob: job %u: loading prefix of size %zu", job->jobID, job->prefix.size); { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, - job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ + job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, ZSTD_dtlm_fast, NULL, /*cdict*/ &jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) JOB_ERROR(initError); } } - /* Perform serial step as early as possible, but after CCtx initialization */ - ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); + /* External Sequences can only be applied after CCtx initialization */ + ZSTDMT_serialState_applySequences(job->serial, cctx, &rawSeqStore); if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue_public(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); From 23e5f80390db9a3a65485933d255e163d2dab519 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 5 Feb 2025 18:47:26 -0800 Subject: [PATCH 9/9] Revert "pass dictionary loading method as parameter" This reverts commit 821fc567f93a415e9fbe856271ccd452ee7acf07. --- lib/compress/zstd_compress.c | 12 +++++------- lib/compress/zstd_compress_internal.h | 27 +++++++++++++-------------- lib/compress/zstd_lazy.c | 16 ++++++++-------- lib/compress/zstd_opt.c | 14 +++++++------- lib/compress/zstdmt_compress.c | 16 ++++++++++++++-- lib/compress/zstdmt_compress.h | 2 +- 6 files changed, 48 insertions(+), 39 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index c560c902a54..d928b1d3ee3 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -1354,12 +1354,10 @@ size_t ZSTD_CCtx_refPrefix_advanced( RETURN_ERROR_IF(cctx->streamStage != zcss_init, stage_wrong, "Can't ref a prefix when ctx not in init stage."); ZSTD_clearAllDicts(cctx); - if (prefixSize > 0) { - RETURN_ERROR_IF(prefix == NULL, dictionary_wrong, "Invalid prefix pointer"); + if (prefix != NULL && prefixSize > 0) { cctx->prefixDict.dict = prefix; cctx->prefixDict.dictSize = prefixSize; cctx->prefixDict.dictContentType = dictContentType; - cctx->prefixDict.loadMethod = ZSTD_dlm_byRef; } return 0; } @@ -3068,7 +3066,7 @@ ZSTD_entropyCompressSeqStore( /* ZSTD_selectBlockCompressor() : * Not static, but internal use only (used by long distance matcher) * assumption : strat is a valid strategy */ -ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_ParamSwitch_e useRowMatchFinder, ZSTD_DictMode_e dictMode) +ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_ParamSwitch_e useRowMatchFinder, ZSTD_dictMode_e dictMode) { static const ZSTD_BlockCompressor_f blockCompressor[4][ZSTD_STRATEGY_MAX+1] = { { ZSTD_compressBlock_fast /* default for 0 */, @@ -3300,7 +3298,7 @@ static size_t ZSTD_buildSeqStore(ZSTD_CCtx* zc, const void* src, size_t srcSize) } /* select and store sequences */ - { ZSTD_DictMode_e const dictMode = ZSTD_matchState_dictMode(ms); + { ZSTD_dictMode_e const dictMode = ZSTD_matchState_dictMode(ms); size_t lastLLSize; { int i; for (i = 0; i < ZSTD_REP_NUM; ++i) @@ -6353,7 +6351,7 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, size_t inSize) { ZSTD_CCtx_params params = cctx->requestedParams; - ZSTD_PrefixDict const prefixDict = cctx->prefixDict; + ZSTD_prefixDict const prefixDict = cctx->prefixDict; FORWARD_IF_ERROR( ZSTD_initLocalDict(cctx) , ""); /* Init the local dict if present. */ ZSTD_memset(&cctx->prefixDict, 0, sizeof(cctx->prefixDict)); /* single usage */ assert(prefixDict.dict==NULL || cctx->cdict==NULL); /* only one can be set */ @@ -6409,7 +6407,7 @@ static size_t ZSTD_CCtx_init_compressStream2(ZSTD_CCtx* cctx, DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbWorkers=%u", params.nbWorkers); FORWARD_IF_ERROR( ZSTDMT_initCStream_internal( cctx->mtctx, - prefixDict.dict, prefixDict.dictSize, prefixDict.dictContentType, prefixDict.loadMethod, + prefixDict.dict, prefixDict.dictSize, prefixDict.dictContentType, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) , ""); cctx->dictID = cctx->cdict ? cctx->cdict->dictID : 0; cctx->dictContentSize = cctx->cdict ? cctx->cdict->dictContentSize : prefixDict.dictSize; diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index f71c47e880c..ca5e2a4c5bf 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -12,8 +12,8 @@ * that shall **only** be used by modules within lib/compress. */ -#ifndef ZSTD_COMPRESS_INTERNAL_H -#define ZSTD_COMPRESS_INTERNAL_H +#ifndef ZSTD_COMPRESS_H +#define ZSTD_COMPRESS_H /*-************************************* * Dependencies @@ -21,10 +21,10 @@ #include "../common/zstd_internal.h" #include "zstd_cwksp.h" #ifdef ZSTD_MULTITHREAD -# include "zstdmt_compress.h" /* ZSTDMT_CCtx */ +# include "zstdmt_compress.h" #endif #include "../common/bits.h" /* ZSTD_highbit32, ZSTD_NbCommonBytes */ -#include "zstd_preSplit.h" /* ZSTD_SLIPBLOCK_WORKSPACESIZE */ +#include "zstd_preSplit.h" /* ZSTD_SLIPBLOCK_WORKSPACESIZE */ /*-************************************* * Constants @@ -46,12 +46,11 @@ typedef enum { ZSTDcs_created=0, ZSTDcs_init, ZSTDcs_ongoing, ZSTDcs_ending } ZSTD_compressionStage_e; typedef enum { zcss_init=0, zcss_load, zcss_flush } ZSTD_cStreamStage; -typedef struct { +typedef struct ZSTD_prefixDict_s { const void* dict; size_t dictSize; ZSTD_dictContentType_e dictContentType; - ZSTD_dictLoadMethod_e loadMethod; -} ZSTD_PrefixDict; +} ZSTD_prefixDict; typedef struct { void* dictBuffer; @@ -468,7 +467,7 @@ typedef struct { U32 partitions[ZSTD_MAX_NB_BLOCK_SPLITS]; ZSTD_entropyCTablesMetadata_t entropyMetadata; -} ZSTD_BlockSplitCtx; +} ZSTD_blockSplitCtx; struct ZSTD_CCtx_s { ZSTD_compressionStage_e stage; @@ -526,7 +525,7 @@ struct ZSTD_CCtx_s { /* Dictionary */ ZSTD_localDict localDict; const ZSTD_CDict* cdict; - ZSTD_PrefixDict prefixDict; /* single-usage dictionary */ + ZSTD_prefixDict prefixDict; /* single-usage dictionary */ /* Multi-threading */ #ifdef ZSTD_MULTITHREAD @@ -539,7 +538,7 @@ struct ZSTD_CCtx_s { #endif /* Workspace for block splitter */ - ZSTD_BlockSplitCtx blockSplitCtx; + ZSTD_blockSplitCtx blockSplitCtx; /* Buffer for output from external sequence producer */ ZSTD_Sequence* extSeqBuf; @@ -554,7 +553,7 @@ typedef enum { ZSTD_extDict = 1, ZSTD_dictMatchState = 2, ZSTD_dedicatedDictSearch = 3 -} ZSTD_DictMode_e; +} ZSTD_dictMode_e; typedef enum { ZSTD_cpm_noAttachDict = 0, /* Compression with ZSTD_noDict or ZSTD_extDict. @@ -579,7 +578,7 @@ typedef enum { typedef size_t (*ZSTD_BlockCompressor_f) ( ZSTD_MatchState_t* bs, SeqStore_t* seqStore, U32 rep[ZSTD_REP_NUM], void const* src, size_t srcSize); -ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_ParamSwitch_e rowMatchfinderMode, ZSTD_DictMode_e dictMode); +ZSTD_BlockCompressor_f ZSTD_selectBlockCompressor(ZSTD_strategy strat, ZSTD_ParamSwitch_e rowMatchfinderMode, ZSTD_dictMode_e dictMode); MEM_STATIC U32 ZSTD_LLcode(U32 litLength) @@ -1069,7 +1068,7 @@ MEM_STATIC U32 ZSTD_window_hasExtDict(ZSTD_window_t const window) * Inspects the provided matchState and figures out what dictMode should be * passed to the compressor. */ -MEM_STATIC ZSTD_DictMode_e ZSTD_matchState_dictMode(const ZSTD_MatchState_t *ms) +MEM_STATIC ZSTD_dictMode_e ZSTD_matchState_dictMode(const ZSTD_MatchState_t *ms) { return ZSTD_window_hasExtDict(ms->window) ? ZSTD_extDict : @@ -1634,4 +1633,4 @@ size_t ZSTD_compressEnd_public(ZSTD_CCtx* cctx, size_t ZSTD_compressBlock_deprecated(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize); -#endif /* ZSTD_COMPRESS_INTERNAL_H */ +#endif /* ZSTD_COMPRESS_H */ diff --git a/lib/compress/zstd_lazy.c b/lib/compress/zstd_lazy.c index 1d5b37c20cc..272ebe0ece7 100644 --- a/lib/compress/zstd_lazy.c +++ b/lib/compress/zstd_lazy.c @@ -74,7 +74,7 @@ ZSTD_ALLOW_POINTER_OVERFLOW_ATTR void ZSTD_insertDUBT1(const ZSTD_MatchState_t* ms, U32 curr, const BYTE* inputEnd, U32 nbCompares, U32 btLow, - const ZSTD_DictMode_e dictMode) + const ZSTD_dictMode_e dictMode) { const ZSTD_compressionParameters* const cParams = &ms->cParams; U32* const bt = ms->chainTable; @@ -168,7 +168,7 @@ size_t ZSTD_DUBT_findBetterDictMatch ( size_t bestLength, U32 nbCompares, U32 const mls, - const ZSTD_DictMode_e dictMode) + const ZSTD_dictMode_e dictMode) { const ZSTD_MatchState_t * const dms = ms->dictMatchState; const ZSTD_compressionParameters* const dmsCParams = &dms->cParams; @@ -244,7 +244,7 @@ size_t ZSTD_DUBT_findBestMatch(ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iend, size_t* offBasePtr, U32 const mls, - const ZSTD_DictMode_e dictMode) + const ZSTD_dictMode_e dictMode) { const ZSTD_compressionParameters* const cParams = &ms->cParams; U32* const hashTable = ms->hashTable; @@ -396,7 +396,7 @@ size_t ZSTD_BtFindBestMatch( ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iLimit, size_t* offBasePtr, const U32 mls /* template */, - const ZSTD_DictMode_e dictMode) + const ZSTD_dictMode_e dictMode) { DEBUGLOG(7, "ZSTD_BtFindBestMatch"); if (ip < ms->window.base + ms->nextToUpdate) return 0; /* skipped area */ @@ -668,7 +668,7 @@ size_t ZSTD_HcFindBestMatch( ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iLimit, size_t* offsetPtr, - const U32 mls, const ZSTD_DictMode_e dictMode) + const U32 mls, const ZSTD_dictMode_e dictMode) { const ZSTD_compressionParameters* const cParams = &ms->cParams; U32* const chainTable = ms->chainTable; @@ -1142,7 +1142,7 @@ size_t ZSTD_RowFindBestMatch( ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iLimit, size_t* offsetPtr, - const U32 mls, const ZSTD_DictMode_e dictMode, + const U32 mls, const ZSTD_dictMode_e dictMode, const U32 rowLog) { U32* const hashTable = ms->hashTable; @@ -1492,7 +1492,7 @@ FORCE_INLINE_TEMPLATE size_t ZSTD_searchMax( U32 const mls, U32 const rowLog, searchMethod_e const searchMethod, - ZSTD_DictMode_e const dictMode) + ZSTD_dictMode_e const dictMode) { if (dictMode == ZSTD_noDict) { ZSTD_SWITCH_SEARCH_METHOD(noDict) @@ -1518,7 +1518,7 @@ size_t ZSTD_compressBlock_lazy_generic( U32 rep[ZSTD_REP_NUM], const void* src, size_t srcSize, const searchMethod_e searchMethod, const U32 depth, - ZSTD_DictMode_e const dictMode) + ZSTD_dictMode_e const dictMode) { const BYTE* const istart = (const BYTE*)src; const BYTE* ip = istart; diff --git a/lib/compress/zstd_opt.c b/lib/compress/zstd_opt.c index 09de5a9d9be..3d7171b755b 100644 --- a/lib/compress/zstd_opt.c +++ b/lib/compress/zstd_opt.c @@ -562,7 +562,7 @@ ZSTD_ALLOW_POINTER_OVERFLOW_ATTR void ZSTD_updateTree_internal( ZSTD_MatchState_t* ms, const BYTE* const ip, const BYTE* const iend, - const U32 mls, const ZSTD_DictMode_e dictMode) + const U32 mls, const ZSTD_dictMode_e dictMode) { const BYTE* const base = ms->window.base; U32 const target = (U32)(ip - base); @@ -592,7 +592,7 @@ ZSTD_insertBtAndGetAllMatches ( ZSTD_MatchState_t* ms, U32* nextToUpdate3, const BYTE* const ip, const BYTE* const iLimit, - const ZSTD_DictMode_e dictMode, + const ZSTD_dictMode_e dictMode, const U32 rep[ZSTD_REP_NUM], const U32 ll0, /* tells if associated literal length is 0 or not. This value must be 0 or 1 */ const U32 lengthToBeat, @@ -838,7 +838,7 @@ U32 ZSTD_btGetAllMatches_internal( const U32 rep[ZSTD_REP_NUM], U32 const ll0, U32 const lengthToBeat, - const ZSTD_DictMode_e dictMode, + const ZSTD_dictMode_e dictMode, const U32 mls) { assert(BOUNDED(3, ms->cParams.minMatch, 6) == mls); @@ -886,7 +886,7 @@ GEN_ZSTD_BT_GET_ALL_MATCHES(dictMatchState) } static ZSTD_getAllMatchesFn -ZSTD_selectBtGetAllMatches(ZSTD_MatchState_t const* ms, ZSTD_DictMode_e const dictMode) +ZSTD_selectBtGetAllMatches(ZSTD_MatchState_t const* ms, ZSTD_dictMode_e const dictMode) { ZSTD_getAllMatchesFn const getAllMatchesFns[3][4] = { ZSTD_BT_GET_ALL_MATCHES_ARRAY(noDict), @@ -1079,7 +1079,7 @@ ZSTD_compressBlock_opt_generic(ZSTD_MatchState_t* ms, U32 rep[ZSTD_REP_NUM], const void* src, size_t srcSize, const int optLevel, - const ZSTD_DictMode_e dictMode) + const ZSTD_dictMode_e dictMode) { optState_t* const optStatePtr = &ms->opt; const BYTE* const istart = (const BYTE*)src; @@ -1445,7 +1445,7 @@ ZSTD_compressBlock_opt_generic(ZSTD_MatchState_t* ms, #ifndef ZSTD_EXCLUDE_BTOPT_BLOCK_COMPRESSOR static size_t ZSTD_compressBlock_opt0( ZSTD_MatchState_t* ms, SeqStore_t* seqStore, U32 rep[ZSTD_REP_NUM], - const void* src, size_t srcSize, const ZSTD_DictMode_e dictMode) + const void* src, size_t srcSize, const ZSTD_dictMode_e dictMode) { return ZSTD_compressBlock_opt_generic(ms, seqStore, rep, src, srcSize, 0 /* optLevel */, dictMode); } @@ -1454,7 +1454,7 @@ static size_t ZSTD_compressBlock_opt0( #ifndef ZSTD_EXCLUDE_BTULTRA_BLOCK_COMPRESSOR static size_t ZSTD_compressBlock_opt2( ZSTD_MatchState_t* ms, SeqStore_t* seqStore, U32 rep[ZSTD_REP_NUM], - const void* src, size_t srcSize, const ZSTD_DictMode_e dictMode) + const void* src, size_t srcSize, const ZSTD_dictMode_e dictMode) { return ZSTD_compressBlock_opt_generic(ms, seqStore, rep, src, srcSize, 2 /* optLevel */, dictMode); } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 8725b90716f..0f1fe6d7469 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1248,7 +1248,7 @@ static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params) size_t ZSTDMT_initCStream_internal( ZSTDMT_CCtx* mtctx, - const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, ZSTD_dictLoadMethod_e dictLoadMethod, + const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) { @@ -1274,6 +1274,18 @@ size_t ZSTDMT_initCStream_internal( mtctx->params = params; mtctx->frameContentSize = pledgedSrcSize; + ZSTD_freeCDict(mtctx->cdictLocal); + if (dict) { + mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, + ZSTD_dlm_byCopy, dictContentType, /* note : a loadPrefix becomes an internal CDict */ + params.cParams, mtctx->cMem); + mtctx->cdict = mtctx->cdictLocal; + if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); + } else { + mtctx->cdictLocal = NULL; + mtctx->cdict = cdict; + } + mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(¶ms); DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10)); mtctx->targetSectionSize = params.jobSize; @@ -1347,7 +1359,7 @@ size_t ZSTDMT_initCStream_internal( } else { /* note : a loadPrefix becomes an internal CDict */ mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, - dictLoadMethod, dictContentType, + ZSTD_dlm_byRef, dictContentType, params.cParams, mtctx->cMem); mtctx->cdict = mtctx->cdictLocal; if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation); diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 2158d614f30..91b489b9cb4 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -64,7 +64,7 @@ size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx); * even if they are not needed for the current compression. * @return : 0, or an error code */ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* mtctx, - const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, ZSTD_dictLoadMethod_e dictLoadMethod, + const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize);